aokolnychyi commented on a change in pull request #2923:
URL: https://github.com/apache/iceberg/pull/2923#discussion_r681365171
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
##########
@@ -40,51 +38,44 @@
import org.apache.iceberg.util.ArrayUtil;
public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
+ private final Table table;
private final Schema schema;
private final RowType flinkSchema;
private final PartitionSpec spec;
- private final LocationProvider locations;
private final FileIO io;
- private final EncryptionManager encryptionManager;
private final long targetFileSizeBytes;
private final FileFormat format;
private final List<Integer> equalityFieldIds;
private final FileAppenderFactory<RowData> appenderFactory;
private transient OutputFileFactory outputFileFactory;
- public RowDataTaskWriterFactory(Schema schema,
+ public RowDataTaskWriterFactory(Table table,
RowType flinkSchema,
- PartitionSpec spec,
- LocationProvider locations,
- FileIO io,
- EncryptionManager encryptionManager,
long targetFileSizeBytes,
FileFormat format,
- Map<String, String> tableProperties,
List<Integer> equalityFieldIds) {
- this.schema = schema;
+ this.table = table;
+ this.schema = table.schema();
this.flinkSchema = flinkSchema;
- this.spec = spec;
- this.locations = locations;
- this.io = io;
- this.encryptionManager = encryptionManager;
+ this.spec = table.spec();
+ this.io = table.io();
this.targetFileSizeBytes = targetFileSizeBytes;
this.format = format;
this.equalityFieldIds = equalityFieldIds;
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
- this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema,
tableProperties, spec);
+ this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema,
table.properties(), spec);
} else {
// TODO provide the ability to customize the equality-delete row schema.
- this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema,
tableProperties, spec,
+ this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema,
table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
}
}
@Override
public void initialize(int taskId, int attemptId) {
- this.outputFileFactory = new OutputFileFactory(spec, format, locations,
io, encryptionManager, taskId, attemptId);
+ this.outputFileFactory = OutputFileFactory.builderFor(table, taskId,
attemptId).build();
Review comment:
I think this was the only place left.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]