tsreaper commented on code in PR #5787:
URL: https://github.com/apache/paimon/pull/5787#discussion_r2162967591


##########
paimon-core/src/main/java/org/apache/paimon/operation/write/FileSystemWriteRestore.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.write;

Review Comment:
   No need for a new package. There are many other classes related to write. 
Why don't you put them all into this package?



##########
paimon-core/src/main/java/org/apache/paimon/operation/write/RestoreFiles.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.write;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.DataFileMeta;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** Restored files with snapshot and total buckets. */
+public class RestoreFiles {
+
+    private final @Nullable Snapshot snapshot;
+    private final @Nullable Integer totalBuckets;
+    private final @Nullable List<DataFileMeta> dataFiles;
+    private final @Nullable IndexFileMeta dynamicBucketIndex;
+    private final @Nullable List<IndexFileMeta> deleteVectorsIndex;
+
+    public RestoreFiles(
+            @Nullable Snapshot snapshot,
+            @Nullable Integer totalBuckets,
+            @Nullable List<DataFileMeta> dataFiles,
+            @Nullable IndexFileMeta dynamicBucketIndex,
+            @Nullable List<IndexFileMeta> deleteVectorsIndex) {
+        this.snapshot = snapshot;
+        this.totalBuckets = totalBuckets;
+        this.dataFiles = dataFiles;
+        this.dynamicBucketIndex = dynamicBucketIndex;
+        this.deleteVectorsIndex = deleteVectorsIndex;
+    }
+
+    @Nullable
+    public Snapshot snapshot() {
+        return snapshot;
+    }
+
+    @Nullable
+    public Integer totalBuckets() {
+        return totalBuckets;
+    }
+
+    public List<DataFileMeta> dataFiles() {

Review Comment:
   `@Nullable`



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.coordinator;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.TableWriteOperator;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.write.WriteRestore;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SegmentsCache;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import 
org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_MEMORY;
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
+
+/**
+ * {@link OperatorCoordinator} for {@link TableWriteOperator}, to use a single 
point to obtain the
+ * list of initialization files required for write operators.
+ */
+public class WriteOperatorCoordinator implements OperatorCoordinator, 
CoordinationRequestHandler {
+
+    private final FileStoreTable table;
+
+    private ThreadPoolExecutor executor;
+    private Map<String, Long> latestCommittedIdentifiers;
+
+    private volatile Snapshot snapshot;
+    private volatile FileStoreScan dataFileScan;
+    private volatile IndexFileHandler indexFileHandler;
+
+    public WriteOperatorCoordinator(FileStoreTable table) {
+        this.table = table;
+    }
+
+    private synchronized void refreshOrCreateScan() {
+        Optional<Snapshot> latestSnapshot = table.latestSnapshot();
+        if (!latestSnapshot.isPresent()) {
+            return;
+        }
+        if (dataFileScan == null) {
+            dataFileScan = table.store().newScan();
+            if (table.coreOptions().manifestDeleteFileDropStats()) {
+                dataFileScan.dropStats();
+            }
+        }
+        if (indexFileHandler == null) {
+            indexFileHandler = table.store().newIndexFileHandler();
+        }
+        snapshot = latestSnapshot.get();
+        dataFileScan.withSnapshot(snapshot);
+    }
+
+    private synchronized ScanCoordinationResponse 
scanDataFiles(ScanCoordinationRequest request)
+            throws IOException {
+        if (snapshot == null) {
+            return new ScanCoordinationResponse(null, null, null, null, null);
+        }
+
+        BinaryRow partition = deserializeBinaryRow(request.partition());
+        int bucket = request.bucket();
+
+        List<DataFileMeta> restoreFiles = new ArrayList<>();
+        List<ManifestEntry> entries =
+                dataFileScan.withPartitionBucket(partition, 
bucket).plan().files();
+        Integer totalBuckets = WriteRestore.extractDataFiles(entries, 
restoreFiles);
+
+        IndexFileMeta dynamicBucketIndex = null;
+        if (request.scanDynamicBucketIndex()) {
+            dynamicBucketIndex =
+                    indexFileHandler.scanHashIndex(snapshot, partition, 
bucket).orElse(null);
+        }
+
+        List<IndexFileMeta> deleteVectorsIndex = null;
+        if (request.scanDeleteVectorsIndex()) {
+            deleteVectorsIndex =
+                    indexFileHandler.scan(snapshot, DELETION_VECTORS_INDEX, 
partition, bucket);
+        }
+
+        return new ScanCoordinationResponse(
+                snapshot, totalBuckets, restoreFiles, dynamicBucketIndex, 
deleteVectorsIndex);
+    }
+
+    private synchronized LatestIdentifierResponse latestCommittedIdentifier(
+            LatestIdentifierRequest request) {
+        String user = request.user();
+        long identifier =
+                latestCommittedIdentifiers.computeIfAbsent(
+                        user,
+                        k ->
+                                table.snapshotManager()
+                                        .latestSnapshotOfUser(user)
+                                        .map(Snapshot::commitIdentifier)
+                                        .orElse(Long.MIN_VALUE));
+        return new LatestIdentifierResponse(identifier);
+    }

Review Comment:
   `snapshot` must be at least as new as `latestSnapshotOfUser`. If 
`latestSnapshotOfUser` is updated, `snapshot` and `dataFileScan`  must also be 
updated, otherwise there will be consistency problems.



##########
paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java:
##########
@@ -94,20 +98,19 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
     protected AbstractFileStoreWrite(
             SnapshotManager snapshotManager,
             FileStoreScan scan,
-            @Nullable IndexMaintainer.Factory<T> indexFactory,
+            @Nullable DynamicBucketIndexMaintainer.Factory dbMaintainerFactory,
             @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
             String tableName,
             CoreOptions options,
             RowType partitionType) {
-        this.snapshotManager = snapshotManager;
-        this.scan = scan;
-        // Statistic is useless in writer
-        if (options.manifestDeleteFileDropStats()) {
-            if (this.scan != null) {
-                this.scan.dropStats();
-            }
+        IndexFileHandler indexFileHandler = null;
+        if (dbMaintainerFactory != null) {
+            indexFileHandler = dbMaintainerFactory.indexFileHandler();
+        } else if (dvMaintainerFactory != null) {
+            indexFileHandler = dvMaintainerFactory.indexFileHandler();
         }
-        this.indexFactory = indexFactory;
+        this.restore = new FileSystemWriteRestore(options, snapshotManager, 
scan, indexFileHandler);

Review Comment:
   Set `FileSystemWriteRestore` in `newWrite` methods of `FileStore`s. Remove 
`scan` and `snapshotManager` from the constructor. Some writers (like unaware 
bucket writers for append only tables) don't need scan.
   
   Also consider removing `dbMaintainerFactory` and `dvMaintainerFactory` from 
the constructor? Many file stores also don't need them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to