JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r891971268


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** A dedicated {@link SinkWriter} for manual triggered compaction. */
+public class StoreSinkCompactor<WriterStateT> extends 
StoreSinkWriterBase<WriterStateT> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StoreSinkCompactor.class);
+
+    private final int subTaskId;
+    private final int numOfParallelInstances;
+
+    private final FileStore fileStore;
+    private final Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
partitionedMeta;
+    private final Map<String, String> partitionSpec;
+
+    public StoreSinkCompactor(
+            int subTaskId,
+            int numOfParallelInstances,
+            FileStore fileStore,
+            Map<String, String> partitionSpec) {
+        this.subTaskId = subTaskId;
+        this.numOfParallelInstances = numOfParallelInstances;
+        this.fileStore = fileStore;
+        this.partitionSpec = partitionSpec;
+        this.partitionedMeta = new HashMap<>();
+    }
+
+    @Override
+    public void flush(boolean endOfInput) {
+        if (endOfInput) {
+            FileStoreScan.Plan plan =
+                    fileStore
+                            .newScan()
+                            .withPartitionFilter(
+                                    PredicateConverter.CONVERTER.fromMap(
+                                            partitionSpec, 
fileStore.partitionType()))
+                            .plan();
+            for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
partEntry :
+                    plan.groupByPartFiles().entrySet()) {
+                BinaryRowData partition = partEntry.getKey();
+                for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                        partEntry.getValue().entrySet()) {
+                    int bucket = bucketEntry.getKey();
+                    if (select(partition, bucket)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                    "Assign partition {}, bucket {} to subtask 
{}",
+                                    FileStorePathFactory.getPartitionComputer(
+                                                    fileStore.partitionType(),
+                                                    FileSystemConnectorOptions
+                                                            
.PARTITION_DEFAULT_NAME
+                                                            .defaultValue())
+                                            .generatePartValues(partition),
+                                    bucket,
+                                    subTaskId);
+                        }
+                        partitionedMeta

Review Comment:
   You don't need to store these meta in a class member. You can just create a 
Writer here.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriterBase.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The base class for file store sink writers.
+ *
+ * @param <WriterStateT> The type of the writer's state.
+ */
+public abstract class StoreSinkWriterBase<WriterStateT>

Review Comment:
   `StoreSinkWriterBase` doesn't seem to help `StoreSinkCompactor` much. You 
can see my comments in `StoreSinkCompactor`.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** A dedicated {@link SinkWriter} for manual triggered compaction. */
+public class StoreSinkCompactor<WriterStateT> extends 
StoreSinkWriterBase<WriterStateT> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StoreSinkCompactor.class);
+
+    private final int subTaskId;
+    private final int numOfParallelInstances;
+
+    private final FileStore fileStore;
+    private final Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
partitionedMeta;
+    private final Map<String, String> partitionSpec;
+
+    public StoreSinkCompactor(
+            int subTaskId,
+            int numOfParallelInstances,
+            FileStore fileStore,
+            Map<String, String> partitionSpec) {
+        this.subTaskId = subTaskId;
+        this.numOfParallelInstances = numOfParallelInstances;
+        this.fileStore = fileStore;
+        this.partitionSpec = partitionSpec;
+        this.partitionedMeta = new HashMap<>();
+    }
+
+    @Override
+    public void flush(boolean endOfInput) {
+        if (endOfInput) {
+            FileStoreScan.Plan plan =
+                    fileStore
+                            .newScan()
+                            .withPartitionFilter(
+                                    PredicateConverter.CONVERTER.fromMap(
+                                            partitionSpec, 
fileStore.partitionType()))
+                            .plan();
+            for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
partEntry :
+                    plan.groupByPartFiles().entrySet()) {
+                BinaryRowData partition = partEntry.getKey();
+                for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                        partEntry.getValue().entrySet()) {
+                    int bucket = bucketEntry.getKey();
+                    if (select(partition, bucket)) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                    "Assign partition {}, bucket {} to subtask 
{}",
+                                    FileStorePathFactory.getPartitionComputer(
+                                                    fileStore.partitionType(),
+                                                    FileSystemConnectorOptions
+                                                            
.PARTITION_DEFAULT_NAME
+                                                            .defaultValue())
+                                            .generatePartValues(partition),
+                                    bucket,
+                                    subTaskId);
+                        }
+                        partitionedMeta
+                                .computeIfAbsent(partition, k -> new 
HashMap<>())
+                                .computeIfAbsent(bucket, k -> new 
ArrayList<>())
+                                .addAll(bucketEntry.getValue());
+                        RecordWriter writer = getWriter(partition, bucket);
+                        try {
+                            writer.flush();

Review Comment:
   You can just close the writer and create the `FileCommittable`.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -79,6 +82,7 @@ public FileStoreSource(
             boolean isContinuous,
             long discoveryInterval,
             boolean latestContinuous,
+            boolean nonRescaleCompact,

Review Comment:
   Can we keep this class as it is?
   We can just create empty `FromElementsSource` for this?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java:
##########
@@ -105,6 +105,9 @@ public void sync() throws Exception {
         // Do nothing here, as this writer don't introduce any async 
compaction thread currently.
     }
 
+    @Override
+    public void flush() throws Exception {}

Review Comment:
   endInput?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to