JingsongLi commented on code in PR #460:
URL: https://github.com/apache/flink-table-store/pull/460#discussion_r1062399865
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java:
##########
@@ -39,6 +40,14 @@ interface StoreSinkWrite {
void compact(BinaryRowData partition, int bucket, boolean fullCompaction)
throws Exception;
+ void compact(
Review Comment:
Can we just add a `addNewFiles(snapshotId, files)` method here?
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java:
##########
@@ -73,17 +76,29 @@ public void initializeState(StateInitializationContext
context) throws Exception
public void open() throws Exception {
super.open();
partitionSerializer = new
RowDataSerializer(table.schema().logicalPartitionType());
- reusedPartition = new OffsetRowData(partitionSerializer.getArity(), 0);
+ reusedPartition = new OffsetRowData(partitionSerializer.getArity(), 1);
+ dataFileMetaSerializer = new DataFileMetaSerializer();
}
@Override
public void processElement(StreamRecord<RowData> element) throws Exception
{
- RowData partitionAndBucket = element.getValue();
- reusedPartition.replace(partitionAndBucket);
+ RowData record = element.getValue();
+
+ long snapshotId = record.getLong(0);
+
+ reusedPartition.replace(record);
BinaryRowData partition =
partitionSerializer.toBinaryRow(reusedPartition).copy();
- int bucket = partitionAndBucket.getInt(partitionSerializer.getArity());
- write.compact(partition, bucket, !isStreaming);
+ int bucket = record.getInt(partitionSerializer.getArity() + 1);
+
+ byte[] serializedFiles =
record.getBinary(partitionSerializer.getArity() + 2);
+ List<DataFileMeta> files =
dataFileMetaSerializer.deserializeList(serializedFiles);
+
+ if (isStreaming) {
+ write.compact(snapshotId, partition, bucket, false, files);
+ } else {
+ write.compact(partition, bucket, true);
Review Comment:
check `files` should be empty?
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java:
##########
@@ -199,56 +215,63 @@ public List<FileCommittable> prepareCommit(boolean
blocking, long commitIdentifi
@Override
public void close() throws Exception {
- for (Map<Integer, WriterWithCommit<T>> bucketWriters :
writers.values()) {
- for (WriterWithCommit<T> writerWithCommit :
bucketWriters.values()) {
- writerWithCommit.writer.close();
+ for (Map<Integer, WriterWrapper<T>> bucketWriters : writers.values()) {
+ for (WriterWrapper<T> writerWrapper : bucketWriters.values()) {
+ writerWrapper.writer.close();
}
}
writers.clear();
compactExecutor.shutdownNow();
}
- private RecordWriter<T> getWriter(BinaryRowData partition, int bucket) {
- Map<Integer, WriterWithCommit<T>> buckets = writers.get(partition);
+ private WriterWrapper<T> getWriterWrapper(BinaryRowData partition, int
bucket) {
+ Map<Integer, WriterWrapper<T>> buckets = writers.get(partition);
if (buckets == null) {
buckets = new HashMap<>();
writers.put(partition.copy(), buckets);
}
- return buckets.computeIfAbsent(bucket, k ->
createWriter(partition.copy(), bucket)).writer;
+ return buckets.computeIfAbsent(bucket, k ->
createWriterWrapper(partition.copy(), bucket));
}
- private WriterWithCommit<T> createWriter(BinaryRowData partition, int
bucket) {
+ private WriterWrapper<T> createWriterWrapper(BinaryRowData partition, int
bucket) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating writer for partition {}, bucket {}",
partition, bucket);
}
- RecordWriter<T> writer =
+ WriterWrapper<T> writerWrapper =
overwrite
- ? createEmptyWriter(partition.copy(), bucket,
compactExecutor)
- : createWriter(partition.copy(), bucket,
compactExecutor);
- notifyNewWriter(writer);
- return new WriterWithCommit<>(writer);
+ ? createEmptyWriterWrapper(partition.copy(), bucket,
compactExecutor)
+ : createWriterWrapper(partition.copy(), bucket,
compactExecutor);
+ notifyNewWriter(writerWrapper.writer);
+ return writerWrapper;
}
protected void notifyNewWriter(RecordWriter<T> writer) {}
/** Create a {@link RecordWriter} from partition and bucket. */
@VisibleForTesting
- public abstract RecordWriter<T> createWriter(
+ public abstract WriterWrapper<T> createWriterWrapper(
BinaryRowData partition, int bucket, ExecutorService
compactExecutor);
/** Create an empty {@link RecordWriter} from partition and bucket. */
@VisibleForTesting
- public abstract RecordWriter<T> createEmptyWriter(
+ public abstract WriterWrapper<T> createEmptyWriterWrapper(
BinaryRowData partition, int bucket, ExecutorService
compactExecutor);
- /** {@link RecordWriter} with identifier of its last modified commit. */
- protected static class WriterWithCommit<T> {
+ /**
+ * {@link RecordWriter} with the snapshot id it is created upon and the
identifier of its last
+ * modified commit.
+ */
+ @VisibleForTesting
+ public static class WriterWrapper<T> {
Review Comment:
`WriterContainer`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]