JingsongLi commented on code in PR #460:
URL: https://github.com/apache/flink-table-store/pull/460#discussion_r1062399865


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWrite.java:
##########
@@ -39,6 +40,14 @@ interface StoreSinkWrite {
 
     void compact(BinaryRowData partition, int bucket, boolean fullCompaction) 
throws Exception;
 
+    void compact(

Review Comment:
   Can we just add a `addNewFiles(snapshotId, files)` method here?



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java:
##########
@@ -73,17 +76,29 @@ public void initializeState(StateInitializationContext 
context) throws Exception
     public void open() throws Exception {
         super.open();
         partitionSerializer = new 
RowDataSerializer(table.schema().logicalPartitionType());
-        reusedPartition = new OffsetRowData(partitionSerializer.getArity(), 0);
+        reusedPartition = new OffsetRowData(partitionSerializer.getArity(), 1);
+        dataFileMetaSerializer = new DataFileMetaSerializer();
     }
 
     @Override
     public void processElement(StreamRecord<RowData> element) throws Exception 
{
-        RowData partitionAndBucket = element.getValue();
-        reusedPartition.replace(partitionAndBucket);
+        RowData record = element.getValue();
+
+        long snapshotId = record.getLong(0);
+
+        reusedPartition.replace(record);
         BinaryRowData partition = 
partitionSerializer.toBinaryRow(reusedPartition).copy();
-        int bucket = partitionAndBucket.getInt(partitionSerializer.getArity());
 
-        write.compact(partition, bucket, !isStreaming);
+        int bucket = record.getInt(partitionSerializer.getArity() + 1);
+
+        byte[] serializedFiles = 
record.getBinary(partitionSerializer.getArity() + 2);
+        List<DataFileMeta> files = 
dataFileMetaSerializer.deserializeList(serializedFiles);
+
+        if (isStreaming) {
+            write.compact(snapshotId, partition, bucket, false, files);
+        } else {
+            write.compact(partition, bucket, true);

Review Comment:
   check `files` should be empty?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java:
##########
@@ -199,56 +215,63 @@ public List<FileCommittable> prepareCommit(boolean 
blocking, long commitIdentifi
 
     @Override
     public void close() throws Exception {
-        for (Map<Integer, WriterWithCommit<T>> bucketWriters : 
writers.values()) {
-            for (WriterWithCommit<T> writerWithCommit : 
bucketWriters.values()) {
-                writerWithCommit.writer.close();
+        for (Map<Integer, WriterWrapper<T>> bucketWriters : writers.values()) {
+            for (WriterWrapper<T> writerWrapper : bucketWriters.values()) {
+                writerWrapper.writer.close();
             }
         }
         writers.clear();
         compactExecutor.shutdownNow();
     }
 
-    private RecordWriter<T> getWriter(BinaryRowData partition, int bucket) {
-        Map<Integer, WriterWithCommit<T>> buckets = writers.get(partition);
+    private WriterWrapper<T> getWriterWrapper(BinaryRowData partition, int 
bucket) {
+        Map<Integer, WriterWrapper<T>> buckets = writers.get(partition);
         if (buckets == null) {
             buckets = new HashMap<>();
             writers.put(partition.copy(), buckets);
         }
-        return buckets.computeIfAbsent(bucket, k -> 
createWriter(partition.copy(), bucket)).writer;
+        return buckets.computeIfAbsent(bucket, k -> 
createWriterWrapper(partition.copy(), bucket));
     }
 
-    private WriterWithCommit<T> createWriter(BinaryRowData partition, int 
bucket) {
+    private WriterWrapper<T> createWriterWrapper(BinaryRowData partition, int 
bucket) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Creating writer for partition {}, bucket {}", 
partition, bucket);
         }
-        RecordWriter<T> writer =
+        WriterWrapper<T> writerWrapper =
                 overwrite
-                        ? createEmptyWriter(partition.copy(), bucket, 
compactExecutor)
-                        : createWriter(partition.copy(), bucket, 
compactExecutor);
-        notifyNewWriter(writer);
-        return new WriterWithCommit<>(writer);
+                        ? createEmptyWriterWrapper(partition.copy(), bucket, 
compactExecutor)
+                        : createWriterWrapper(partition.copy(), bucket, 
compactExecutor);
+        notifyNewWriter(writerWrapper.writer);
+        return writerWrapper;
     }
 
     protected void notifyNewWriter(RecordWriter<T> writer) {}
 
     /** Create a {@link RecordWriter} from partition and bucket. */
     @VisibleForTesting
-    public abstract RecordWriter<T> createWriter(
+    public abstract WriterWrapper<T> createWriterWrapper(
             BinaryRowData partition, int bucket, ExecutorService 
compactExecutor);
 
     /** Create an empty {@link RecordWriter} from partition and bucket. */
     @VisibleForTesting
-    public abstract RecordWriter<T> createEmptyWriter(
+    public abstract WriterWrapper<T> createEmptyWriterWrapper(
             BinaryRowData partition, int bucket, ExecutorService 
compactExecutor);
 
-    /** {@link RecordWriter} with identifier of its last modified commit. */
-    protected static class WriterWithCommit<T> {
+    /**
+     * {@link RecordWriter} with the snapshot id it is created upon and the 
identifier of its last
+     * modified commit.
+     */
+    @VisibleForTesting
+    public static class WriterWrapper<T> {

Review Comment:
   `WriterContainer`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to