stevenzwu commented on code in PR #6407:
URL: https://github.com/apache/iceberg/pull/6407#discussion_r1046286159
##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -220,7 +212,7 @@ public FlinkInputFormat buildFormat() {
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
return new FlinkInputFormat(
- tableLoader, icebergSchema, io, encryption, contextBuilder.build());
+ (SerializableTable) SerializableTable.copyOf(table),
contextBuilder.build());
Review Comment:
Personally I like that in this PR`FlinkInputFormat` use the
`SeriallizableTable` as arg type so that we are clear it is a
`SerializableTable` or regular table.
I see other place just use `Table` as arg to avoid type cast, e.g.
`RowDataTaskWriterFactory`. hence just to point it out.
```
static IcebergStreamWriter<RowData> createStreamWriter(
Table table,
FlinkWriteConf flinkWriteConf,
RowType flinkRowType,
List<Integer> equalityFieldIds) {
Preconditions.checkArgument(table != null, "Iceberg table shouldn't be
null");
Table serializableTable = SerializableTable.copyOf(table);
TaskWriterFactory<RowData> taskWriterFactory =
new RowDataTaskWriterFactory(
serializableTable,
flinkRowType,
flinkWriteConf.targetDataFileSize(),
flinkWriteConf.dataFileFormat(),
equalityFieldIds,
flinkWriteConf.upsertMode());
return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}
```
##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java:
##########
@@ -41,28 +39,19 @@ public class FlinkInputFormat extends
RichInputFormat<RowData, FlinkInputSplit>
private static final long serialVersionUID = 1L;
- private final TableLoader tableLoader;
- private final FileIO io;
- private final EncryptionManager encryption;
private final ScanContext context;
private final RowDataFileScanTaskReader rowDataReader;
+ private final Table table;
Review Comment:
nit: put this before `context` following the same order on usage bellow.
##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java:
##########
@@ -18,53 +18,45 @@
*/
package org.apache.iceberg.flink.source.reader;
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData>
{
- private final Schema tableSchema;
private final Schema readSchema;
- private final String nameMapping;
private final boolean caseSensitive;
- private final FileIO io;
- private final EncryptionManager encryption;
+ private final Table table;
Review Comment:
nit: keep the same order as arg usage
##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -189,25 +188,18 @@ public FlinkInputFormat buildFormat() {
Preconditions.checkNotNull(tableLoader, "TableLoader should not be
null");
Schema icebergSchema;
- FileIO io;
- EncryptionManager encryption;
if (table == null) {
// load required fields by table loader.
tableLoader.open();
try (TableLoader loader = tableLoader) {
table = loader.loadTable();
- icebergSchema = table.schema();
- io = table.io();
- encryption = table.encryption();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
- } else {
- icebergSchema = table.schema();
- io = table.io();
- encryption = table.encryption();
}
+ icebergSchema = table.schema();
+
Review Comment:
nit: empty line not needed?
##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java:
##########
@@ -77,20 +65,12 @@ public RowDataRewriter(
RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
this.taskWriterFactory =
new RowDataTaskWriterFactory(
- SerializableTable.copyOf(table),
Review Comment:
`table` seems like a regular table, if we trace back the code. so we may not
be able to change this.
##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderFunctionTestBase.java:
##########
@@ -51,16 +55,28 @@ public static Object[][] parameters() {
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+ public static final HadoopTables tables = new HadoopTables(new
Configuration());
Review Comment:
can we use `HadoopTableResource` instead?
##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java:
##########
@@ -18,53 +18,45 @@
*/
package org.apache.iceberg.flink.source.reader;
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData>
{
- private final Schema tableSchema;
private final Schema readSchema;
- private final String nameMapping;
private final boolean caseSensitive;
- private final FileIO io;
- private final EncryptionManager encryption;
+ private final Table table;
public RowDataReaderFunction(
- ReadableConfig config,
- Schema tableSchema,
- Schema projectedSchema,
- String nameMapping,
- boolean caseSensitive,
- FileIO io,
- EncryptionManager encryption) {
+ Table table, ReadableConfig config, Schema projectedSchema, boolean
caseSensitive) {
Review Comment:
if we want to be consistent, this should be `SerializableTable`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]