This is an automated email from the ASF dual-hosted git repository.

loogn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5307fcb0 [ISSUE-622] feat: support distributed writing with one Paimon 
graph store (#623)
5307fcb0 is described below

commit 5307fcb0f359915057d2b4d1426428c3cc3e4569
Author: Qingwen Zhao <[email protected]>
AuthorDate: Thu Sep 25 15:11:53 2025 +0800

    [ISSUE-622] feat: support distributed writing with one Paimon graph store 
(#623)
    
    * feat: support paimon store in distributed mode
    
    * update
    
    * fix checkstyle
    
    * update
    
    * fix ut
    
    * update
    
    * disable dynamic traversal with data appending
---
 .../java/org/apache/geaflow/view/IViewDesc.java    |   2 +
 .../graph/materialize/GraphViewMaterializeOp.java  |  33 ++++-
 .../impl/graph/materialize/PaimonGlobalSink.java   | 139 +++++++++++++++++++
 .../view/materialize/MaterializedIncGraph.java     |  16 ++-
 .../pdata/stream/window/WindowStreamSink.java      |   3 +-
 .../apache/geaflow/plan/PipelinePlanBuilder.java   | 153 +++++++++++++++------
 .../org/apache/geaflow/plan/PipelinePlanTest.java  |  32 +++--
 .../dsl/runtime/engine/GeaFlowRuntimeGraph.java    |   9 ++
 .../geaflow/dsl/runtime/query/GQLInsertTest.java   |  22 +++
 .../resources/expect/gql_insert_and_graph_006.txt  |   0
 .../resources/query/gql_insert_and_graph_006.sql   |  70 ++++++++++
 .../resources/query/gql_insert_and_graph_007.sql   |  75 ++++++++++
 .../geaflow/store/paimon/BasePaimonGraphStore.java |  54 +++++++-
 .../geaflow/store/paimon/BasePaimonStore.java      |  43 ++++--
 .../store/paimon/DynamicGraphPaimonStoreBase.java  |  15 +-
 .../apache/geaflow/store/paimon/KVPaimonStore.java |   4 +-
 ...CatalogClient.java => PaimonCatalogClient.java} |  49 ++-----
 .../geaflow/store/paimon/PaimonCatalogManager.java |  71 ++++++++++
 .../geaflow/store/paimon/PaimonConfigKeys.java     |  43 ------
 .../geaflow/store/paimon/PaimonTableRWHandle.java  |  71 ++++++++--
 .../store/paimon/StaticGraphPaimonStoreBase.java   |  11 +-
 .../store/paimon/commit/PaimonCommitRegistry.java  |  80 +++++++++++
 .../geaflow/store/paimon/commit/PaimonMessage.java |  89 ++++++++++++
 .../store/paimon/config/PaimonConfigKeys.java      |  74 ++++++++++
 .../store/paimon/config/PaimonStoreConfig.java     |  46 +++++++
 .../store/paimon/proxy/PaimonBaseGraphProxy.java   |   4 +-
 .../proxy/PaimonGraphMultiVersionedProxy.java      |   6 +-
 .../geaflow/store/paimon/PaimonRWHandleTest.java   |   2 +-
 .../geaflow/state/PaimonDynamicGraphStateTest.java |  28 ++--
 .../apache/geaflow/state/PaimonGraphStateTest.java |  37 +++--
 .../apache/geaflow/state/PaimonKeyStateTest.java   |  15 +-
 geaflow/pom.xml                                    |   2 +-
 32 files changed, 1077 insertions(+), 221 deletions(-)

diff --git 
a/geaflow/geaflow-core/geaflow-api/src/main/java/org/apache/geaflow/view/IViewDesc.java
 
b/geaflow/geaflow-core/geaflow-api/src/main/java/org/apache/geaflow/view/IViewDesc.java
index 272d896d..9bd006b6 100644
--- 
a/geaflow/geaflow-core/geaflow-api/src/main/java/org/apache/geaflow/view/IViewDesc.java
+++ 
b/geaflow/geaflow-core/geaflow-api/src/main/java/org/apache/geaflow/view/IViewDesc.java
@@ -64,6 +64,8 @@ public interface IViewDesc extends Serializable {
         RocksDB,
         // Memory backend.
         Memory,
+        // Paimon backend.
+        Paimon,
         // Custom backend.
         Custom;
 
diff --git 
a/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/GraphViewMaterializeOp.java
 
b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/GraphViewMaterializeOp.java
index fa4d6aef..46b44048 100644
--- 
a/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/GraphViewMaterializeOp.java
+++ 
b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/GraphViewMaterializeOp.java
@@ -24,6 +24,7 @@ import static 
org.apache.geaflow.operator.Constants.GRAPH_VERSION;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.util.List;
 import org.apache.geaflow.api.graph.materialize.GraphMaterializeFunction;
 import org.apache.geaflow.api.trait.CheckpointTrait;
 import org.apache.geaflow.api.trait.TransactionTrait;
@@ -36,12 +37,17 @@ import org.apache.geaflow.state.DataModel;
 import org.apache.geaflow.state.GraphState;
 import org.apache.geaflow.state.StateFactory;
 import org.apache.geaflow.state.descriptor.GraphStateDescriptor;
+import org.apache.geaflow.store.paimon.commit.PaimonCommitRegistry;
+import 
org.apache.geaflow.store.paimon.commit.PaimonCommitRegistry.TaskCommitMessage;
+import org.apache.geaflow.store.paimon.commit.PaimonMessage;
 import org.apache.geaflow.utils.keygroup.IKeyGroupAssigner;
 import org.apache.geaflow.utils.keygroup.KeyGroup;
 import org.apache.geaflow.utils.keygroup.KeyGroupAssignerFactory;
 import org.apache.geaflow.utils.keygroup.KeyGroupAssignment;
+import org.apache.geaflow.view.IViewDesc.BackendType;
 import org.apache.geaflow.view.graph.GraphViewDesc;
 import org.apache.geaflow.view.meta.ViewMetaBookKeeper;
+import org.apache.paimon.table.sink.CommitMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +78,8 @@ public class GraphViewMaterializeOp<K, VV, EV> extends 
AbstractOneInputOperator<
         int maxPara = graphViewDesc.getShardNum();
         int taskPara = runtimeContext.getTaskArgs().getParallelism();
         Preconditions.checkArgument(taskPara <= maxPara,
-            String.format("task parallelism '%s' must be <= shard num(max 
parallelism) '%s'", taskPara, maxPara));
+            String.format("task parallelism '%s' must be <= shard num(max 
parallelism) '%s'",
+                taskPara, maxPara));
 
         int taskIndex = runtimeContext.getTaskArgs().getTaskIndex();
         KeyGroup keyGroup = 
KeyGroupAssignment.computeKeyGroupRangeForOperatorIndex(maxPara,
@@ -85,7 +92,8 @@ public class GraphViewMaterializeOp<K, VV, EV> extends 
AbstractOneInputOperator<
         int taskId = runtimeContext.getTaskArgs().getTaskId();
         LOGGER.info("opName:{} taskId:{} taskIndex:{} keyGroup:{}", name, 
taskId,
             taskIndex, keyGroup);
-        this.graphState = StateFactory.buildGraphState(descriptor, 
runtimeContext.getConfiguration());
+        this.graphState = StateFactory.buildGraphState(descriptor,
+            runtimeContext.getConfiguration());
         recover();
         this.function = new DynamicGraphMaterializeFunction<>(graphState);
     }
@@ -105,12 +113,28 @@ public class GraphViewMaterializeOp<K, VV, EV> extends 
AbstractOneInputOperator<
         this.graphState.manage().operate().setCheckpointId(checkpointId);
         this.graphState.manage().operate().finish();
         this.graphState.manage().operate().archive();
+
+        if (graphViewDesc.getBackend() == BackendType.Paimon) {
+            int taskIndex = runtimeContext.getTaskArgs().getTaskIndex();
+
+            PaimonCommitRegistry registry = PaimonCommitRegistry.getInstance();
+            List<TaskCommitMessage> messages = 
registry.pollMessages(taskIndex);
+            if (messages != null && !messages.isEmpty()) {
+                for (TaskCommitMessage message : messages) {
+                    List<CommitMessage> msg = message.getMessages();
+                    LOGGER.info("task {} emits windowId:{} chkId:{} table:{} 
messages:{}",
+                        taskIndex, windowId, checkpointId, 
message.getTableName(),
+                        msg.size());
+                    collectValue(new PaimonMessage(checkpointId, 
message.getTableName(),
+                        msg));
+                }
+            }
+        }
         LOGGER.info("do checkpoint over, checkpointId: {}", checkpointId);
     }
 
     @Override
     public void finish(long windowId) {
-
     }
 
     @Override
@@ -159,7 +183,8 @@ public class GraphViewMaterializeOp<K, VV, EV> extends 
AbstractOneInputOperator<
         }
     }
 
-    public static class DynamicGraphMaterializeFunction<K, VV, EV> implements 
GraphMaterializeFunction<K, VV, EV> {
+    public static class DynamicGraphMaterializeFunction<K, VV, EV> implements
+        GraphMaterializeFunction<K, VV, EV> {
 
         private final GraphState<K, VV, EV> graphState;
 
diff --git 
a/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/PaimonGlobalSink.java
 
b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/PaimonGlobalSink.java
new file mode 100644
index 00000000..44ba94ae
--- /dev/null
+++ 
b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/PaimonGlobalSink.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.operator.impl.graph.materialize;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.geaflow.api.context.RuntimeContext;
+import org.apache.geaflow.api.function.RichWindowFunction;
+import org.apache.geaflow.api.function.io.SinkFunction;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
+import org.apache.geaflow.common.exception.GeaflowRuntimeException;
+import org.apache.geaflow.store.paimon.PaimonCatalogClient;
+import org.apache.geaflow.store.paimon.PaimonCatalogManager;
+import org.apache.geaflow.store.paimon.commit.PaimonMessage;
+import org.apache.geaflow.store.paimon.config.PaimonConfigKeys;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PaimonGlobalSink extends RichWindowFunction implements 
SinkFunction<PaimonMessage> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PaimonGlobalSink.class);
+
+    private String dbName;
+    private String jobName;
+    private long windowId;
+
+    private List<PaimonMessage> paimonMessages;
+    private Map<String, StreamWriteBuilder> writeBuilders;
+    private PaimonCatalogClient client;
+
+    @Override
+    public void open(RuntimeContext runtimeContext) {
+        this.windowId = runtimeContext.getWindowId();
+        Configuration jobConfig = runtimeContext.getConfiguration();
+        this.client = PaimonCatalogManager.getCatalogClient(jobConfig);
+        this.jobName = jobConfig.getString(ExecutionConfigKeys.JOB_APP_NAME);
+        this.dbName = 
jobConfig.getString(PaimonConfigKeys.PAIMON_STORE_DATABASE);
+
+        this.writeBuilders = new HashMap<>();
+        this.paimonMessages = new ArrayList<>();
+        LOGGER.info("init paimon sink with db {}", dbName);
+    }
+
+    @Override
+    public void write(PaimonMessage message) throws Exception {
+        paimonMessages.add(message);
+    }
+
+    @Override
+    public void finish() {
+        if (paimonMessages.isEmpty()) {
+            LOGGER.info("commit windowId {} empty messages", windowId);
+            return;
+        }
+
+        final long startTime = System.currentTimeMillis();
+        final long checkpointId = paimonMessages.get(0).getCheckpointId();
+        Map<String, List<CommitMessage>> tableMessages = new HashMap<>();
+        for (PaimonMessage message : paimonMessages) {
+            List<CommitMessage> messages = message.getMessages();
+            if (messages != null && !messages.isEmpty()) {
+                String tableName = message.getTableName();
+                List<CommitMessage> commitMessages = 
tableMessages.computeIfAbsent(tableName,
+                    k -> new ArrayList<>());
+                commitMessages.addAll(messages);
+            }
+        }
+        long deserializeTime = System.currentTimeMillis() - startTime;
+        if (!tableMessages.isEmpty()) {
+            for (Map.Entry<String, List<CommitMessage>> entry : 
tableMessages.entrySet()) {
+                LOGGER.info("commit table:{} messages:{}", entry.getKey(), 
entry.getValue().size());
+                StreamWriteBuilder writeBuilder = 
getWriteBuilder(entry.getKey());
+                try (StreamTableCommit commit = writeBuilder.newCommit()) {
+                    try {
+                        commit.commit(checkpointId, entry.getValue());
+                    } catch (Throwable e) {
+                        LOGGER.warn("commit failed: {}", e.getMessage(), e);
+                        Map<Long, List<CommitMessage>> commitIdAndMessages = 
new HashMap<>();
+                        commitIdAndMessages.put(checkpointId, 
entry.getValue());
+                        commit.filterAndCommit(commitIdAndMessages);
+                    }
+                } catch (Throwable e) {
+                    LOGGER.error("Failed to commit data into Paimon: {}", 
e.getMessage(), e);
+                    throw new GeaflowRuntimeException("Failed to commit data 
into Paimon.", e);
+                }
+            }
+        }
+        LOGGER.info("committed chkId:{} messages:{} deserializeCost:{}ms",
+            checkpointId, paimonMessages.size(), deserializeTime);
+        paimonMessages.clear();
+    }
+
+    private StreamWriteBuilder getWriteBuilder(String tableName) {
+        return writeBuilders.computeIfAbsent(tableName, k -> {
+            try {
+                FileStoreTable table = (FileStoreTable) 
client.getTable(Identifier.create(dbName,
+                    tableName));
+                return table.newStreamWriteBuilder().withCommitUser(jobName);
+            } catch (Throwable e) {
+                String msg = String.format("%s.%s not exist.", dbName, 
tableName);
+                throw new GeaflowRuntimeException(msg, e);
+            }
+        });
+    }
+
+    @Override
+    public void close() {
+        LOGGER.info("close sink");
+        if (client != null) {
+            client.close();
+        }
+    }
+
+}
diff --git 
a/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/graph/view/materialize/MaterializedIncGraph.java
 
b/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/graph/view/materialize/MaterializedIncGraph.java
index a8beef88..45bfd98a 100644
--- 
a/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/graph/view/materialize/MaterializedIncGraph.java
+++ 
b/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/graph/view/materialize/MaterializedIncGraph.java
@@ -27,14 +27,19 @@ import org.apache.geaflow.model.graph.edge.IEdge;
 import org.apache.geaflow.model.graph.vertex.IVertex;
 import org.apache.geaflow.operator.base.AbstractOperator;
 import 
org.apache.geaflow.operator.impl.graph.materialize.GraphViewMaterializeOp;
+import org.apache.geaflow.operator.impl.graph.materialize.PaimonGlobalSink;
+import org.apache.geaflow.operator.impl.window.SinkOperator;
 import org.apache.geaflow.partitioner.IPartitioner;
 import org.apache.geaflow.partitioner.impl.KeyPartitioner;
 import org.apache.geaflow.pdata.graph.view.AbstractGraphView;
 import org.apache.geaflow.pdata.graph.window.WindowStreamGraph;
 import org.apache.geaflow.pdata.stream.Stream;
 import org.apache.geaflow.pdata.stream.TransformType;
+import org.apache.geaflow.pdata.stream.window.WindowStreamSink;
 import org.apache.geaflow.pipeline.context.IPipelineContext;
+import org.apache.geaflow.store.paimon.commit.PaimonMessage;
 import org.apache.geaflow.view.IViewDesc;
+import org.apache.geaflow.view.IViewDesc.BackendType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,7 +72,16 @@ public class MaterializedIncGraph<K, VV, EV> extends 
AbstractGraphView<K, VV, EV
             + "edgeStream parallelism must <= number of graph shard num";
         this.edgeStream = this.edgeStream
             .keyBy(new WindowStreamGraph.DefaultEdgePartition<>());
-        super.context.addPAction(this);
+
+        if (graphViewDesc.getBackend() == BackendType.Paimon) {
+            SinkOperator<PaimonMessage> operator =
+                new SinkOperator<>(new PaimonGlobalSink());
+            WindowStreamSink globalSink = new WindowStreamSink<>(this, 
operator);
+            globalSink.withParallelism(1);
+            super.context.addPAction(globalSink);
+        } else {
+            super.context.addPAction(this);
+        }
 
         assert this.getParallelism() == graphViewDesc.getShardNum() : 
"Materialize parallelism "
             + "must be equal to the graph shard num.";
diff --git 
a/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/stream/window/WindowStreamSink.java
 
b/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/stream/window/WindowStreamSink.java
index 4dc6de78..9c346013 100644
--- 
a/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/stream/window/WindowStreamSink.java
+++ 
b/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/stream/window/WindowStreamSink.java
@@ -35,8 +35,7 @@ public class WindowStreamSink<T> extends Stream<T> implements 
PStreamSink<T> {
         super(pipelineContext, operator);
     }
 
-    public WindowStreamSink(Stream stream,
-                            AbstractOperator operator) {
+    public WindowStreamSink(Stream stream, AbstractOperator operator) {
         super(stream, operator);
     }
 
diff --git 
a/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/main/java/org/apache/geaflow/plan/PipelinePlanBuilder.java
 
b/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/main/java/org/apache/geaflow/plan/PipelinePlanBuilder.java
index e9240115..8ab196b4 100644
--- 
a/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/main/java/org/apache/geaflow/plan/PipelinePlanBuilder.java
+++ 
b/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/main/java/org/apache/geaflow/plan/PipelinePlanBuilder.java
@@ -102,7 +102,8 @@ public class PipelinePlanBuilder implements Serializable {
         });
 
         boolean isSingleWindow = 
this.pipelineGraph.getSourceVertices().stream().allMatch(v ->
-            ((AbstractOperator) v.getOperator()).getOpArgs().getOpType() == 
OpArgs.OpType.SINGLE_WINDOW_SOURCE);
+            ((AbstractOperator) v.getOperator()).getOpArgs().getOpType()
+                == OpArgs.OpType.SINGLE_WINDOW_SOURCE);
         if (isSingleWindow) {
             
pipelineContext.getConfig().put(BATCH_NUMBER_PER_CHECKPOINT.getKey(),
                 String.valueOf(SINGLE_WINDOW_CHECKPOINT_DURATION));
@@ -143,7 +144,8 @@ public class PipelinePlanBuilder implements Serializable {
             if (action instanceof PGraphMaterialize) {
                 visitMaterializeAction((PGraphMaterialize) action);
             } else {
-                PipelineVertex pipelineVertex = new PipelineVertex(vId, 
stream.getOperator(), stream.getParallelism());
+                PipelineVertex pipelineVertex = new PipelineVertex(vId, 
stream.getOperator(),
+                    stream.getParallelism());
                 if (action instanceof PStreamSink) {
                     pipelineVertex.setType(VertexType.sink);
                     pipelineVertex.setVertexMode(VertexMode.append);
@@ -178,11 +180,13 @@ public class PipelinePlanBuilder implements Serializable {
         Preconditions.checkArgument(vertexStreamInput != null && 
edgeStreamInput != null,
             "input vertex and edge stream must be not null");
 
-        PipelineEdge vertexInputEdge = new 
PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(),
+        PipelineEdge vertexInputEdge = new PipelineEdge(this.edgeIdGenerator++,
+            vertexStreamInput.getId(),
             stream.getId(), vertexStreamInput.getPartition(), 
vertexStreamInput.getEncoder());
         vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name());
         this.pipelineGraph.addEdge(vertexInputEdge);
-        PipelineEdge edgeInputEdge = new PipelineEdge(this.edgeIdGenerator++, 
edgeStreamInput.getId(),
+        PipelineEdge edgeInputEdge = new PipelineEdge(this.edgeIdGenerator++,
+            edgeStreamInput.getId(),
             stream.getId(), edgeStreamInput.getPartition(), 
edgeStreamInput.getEncoder());
         edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name());
         this.pipelineGraph.addEdge(edgeInputEdge);
@@ -207,6 +211,30 @@ public class PipelinePlanBuilder implements Serializable {
                     pipelineVertex.setAffinity(AffinityLevel.worker);
                     break;
                 }
+                case ContinueGraphMaterialize:
+                    pipelineVertex.setType(VertexType.inc_process);
+                    pipelineVertex.setAffinity(AffinityLevel.worker);
+                    MaterializedIncGraph pGraphMaterialize = 
(MaterializedIncGraph) stream;
+
+                    Stream vertexInput = pGraphMaterialize.getInput();
+                    Stream edgeInput = (Stream) pGraphMaterialize.getEdges();
+                    Preconditions.checkArgument(vertexInput != null && 
edgeInput != null,
+                        "input vertex and edge stream must be not null");
+
+                    PipelineEdge vertexEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                        vertexInput.getId(),
+                        stream.getId(), vertexInput.getPartition(), 
vertexInput.getEncoder());
+                    vertexEdge.setEdgeName(GraphRecordNames.Vertex.name());
+                    this.pipelineGraph.addEdge(vertexEdge);
+                    PipelineEdge edgeEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                        edgeInput.getId(),
+                        stream.getId(), edgeInput.getPartition(), 
edgeInput.getEncoder());
+                    edgeEdge.setEdgeName(GraphRecordNames.Edge.name());
+                    this.pipelineGraph.addEdge(edgeEdge);
+
+                    visitNode(vertexInput);
+                    visitNode(edgeInput);
+                    break;
                 case ContinueGraphCompute: {
                     pipelineVertex.setType(VertexType.inc_iterator);
                     pipelineVertex.setAffinity(AffinityLevel.worker);
@@ -217,26 +245,33 @@ public class PipelinePlanBuilder implements Serializable {
                             
pipelineVertex.setType(VertexType.inc_vertex_centric);
                             break;
                         default:
-                            throw new GeaflowRuntimeException("not support 
graph compute type, " + computeType);
+                            throw new GeaflowRuntimeException(
+                                "not support graph compute type, " + 
computeType);
                     }
 
                     
pipelineVertex.setIterations(pGraphCompute.getMaxIterations());
                     Stream vertexStreamInput = pGraphCompute.getInput();
                     Stream edgeStreamInput = (Stream) pGraphCompute.getEdges();
-                    Preconditions.checkArgument(vertexStreamInput != null && 
edgeStreamInput != null,
+                    Preconditions.checkArgument(
+                        vertexStreamInput != null && edgeStreamInput != null,
                         "input vertex and edge stream must be not null");
 
-                    PipelineEdge vertexInputEdge = new 
PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(),
-                        stream.getId(), vertexStreamInput.getPartition(), 
vertexStreamInput.getEncoder());
+                    PipelineEdge vertexInputEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                        vertexStreamInput.getId(),
+                        stream.getId(), vertexStreamInput.getPartition(),
+                        vertexStreamInput.getEncoder());
                     
vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name());
                     this.pipelineGraph.addEdge(vertexInputEdge);
-                    PipelineEdge edgeInputEdge = new 
PipelineEdge(this.edgeIdGenerator++, edgeStreamInput.getId(),
-                        stream.getId(), edgeStreamInput.getPartition(), 
edgeStreamInput.getEncoder());
+                    PipelineEdge edgeInputEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                        edgeStreamInput.getId(),
+                        stream.getId(), edgeStreamInput.getPartition(),
+                        edgeStreamInput.getEncoder());
                     edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name());
                     this.pipelineGraph.addEdge(edgeInputEdge);
 
                     // iteration loop edge
-                    PipelineEdge iterationEdge = buildIterationEdge(vId, 
pGraphCompute.getMsgEncoder());
+                    PipelineEdge iterationEdge = buildIterationEdge(vId,
+                        pGraphCompute.getMsgEncoder());
                     this.pipelineGraph.addEdge(iterationEdge);
 
                     buildIterationAggVertexAndEdge(pipelineVertex);
@@ -255,26 +290,33 @@ public class PipelinePlanBuilder implements Serializable {
                             pipelineVertex.setType(VertexType.vertex_centric);
                             break;
                         default:
-                            throw new GeaflowRuntimeException("not support 
graph compute type, " + computeType);
+                            throw new GeaflowRuntimeException(
+                                "not support graph compute type, " + 
computeType);
                     }
 
                     
pipelineVertex.setIterations(pGraphCompute.getMaxIterations());
                     Stream vertexStreamInput = pGraphCompute.getInput();
                     Stream edgeStreamInput = (Stream) pGraphCompute.getEdges();
-                    Preconditions.checkArgument(vertexStreamInput != null && 
edgeStreamInput != null,
+                    Preconditions.checkArgument(
+                        vertexStreamInput != null && edgeStreamInput != null,
                         "input vertex and edge stream must be not null");
 
-                    PipelineEdge vertexInputEdge = new 
PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(),
-                        stream.getId(), vertexStreamInput.getPartition(), 
vertexStreamInput.getEncoder());
+                    PipelineEdge vertexInputEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                        vertexStreamInput.getId(),
+                        stream.getId(), vertexStreamInput.getPartition(),
+                        vertexStreamInput.getEncoder());
                     
vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name());
                     this.pipelineGraph.addEdge(vertexInputEdge);
-                    PipelineEdge edgeInputEdge = new 
PipelineEdge(this.edgeIdGenerator++, edgeStreamInput.getId(),
-                        stream.getId(), edgeStreamInput.getPartition(), 
edgeStreamInput.getEncoder());
+                    PipelineEdge edgeInputEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                        edgeStreamInput.getId(),
+                        stream.getId(), edgeStreamInput.getPartition(),
+                        edgeStreamInput.getEncoder());
                     edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name());
                     this.pipelineGraph.addEdge(edgeInputEdge);
 
                     // iteration loop edge
-                    PipelineEdge iterationEdge = buildIterationEdge(vId, 
pGraphCompute.getMsgEncoder());
+                    PipelineEdge iterationEdge = buildIterationEdge(vId,
+                        pGraphCompute.getMsgEncoder());
                     this.pipelineGraph.addEdge(iterationEdge);
 
                     buildIterationAggVertexAndEdge(pipelineVertex);
@@ -293,28 +335,35 @@ public class PipelinePlanBuilder implements Serializable {
                             pipelineVertex.setType(VertexType.vertex_centric);
                             break;
                         default:
-                            throw new GeaflowRuntimeException("not support 
graph traversal type, " + traversalType);
+                            throw new GeaflowRuntimeException(
+                                "not support graph traversal type, " + 
traversalType);
                     }
 
                     
pipelineVertex.setIterations(windowGraph.getMaxIterations());
                     Stream vertexStreamInput = windowGraph.getInput();
                     Stream edgeStreamInput = (Stream) windowGraph.getEdges();
-                    Preconditions.checkArgument(vertexStreamInput != null && 
edgeStreamInput != null,
+                    Preconditions.checkArgument(
+                        vertexStreamInput != null && edgeStreamInput != null,
                         "input vertex and edge stream must be not null");
 
-                    PipelineEdge vertexInputEdge = new 
PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(),
-                        stream.getId(), vertexStreamInput.getPartition(), 
vertexStreamInput.getEncoder());
+                    PipelineEdge vertexInputEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                        vertexStreamInput.getId(),
+                        stream.getId(), vertexStreamInput.getPartition(),
+                        vertexStreamInput.getEncoder());
                     
vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name());
                     this.pipelineGraph.addEdge(vertexInputEdge);
-                    PipelineEdge edgeInputEdge = new 
PipelineEdge(this.edgeIdGenerator++, edgeStreamInput.getId(),
-                        stream.getId(), edgeStreamInput.getPartition(), 
edgeStreamInput.getEncoder());
+                    PipelineEdge edgeInputEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                        edgeStreamInput.getId(),
+                        stream.getId(), edgeStreamInput.getPartition(),
+                        edgeStreamInput.getEncoder());
                     edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name());
                     this.pipelineGraph.addEdge(edgeInputEdge);
 
                     // Add request input.
                     if (windowGraph.getRequestStream() != null) {
                         Stream requestStreamInput = (Stream) 
windowGraph.getRequestStream();
-                        PipelineEdge requestInputEdge = new 
PipelineEdge(this.edgeIdGenerator++, requestStreamInput.getId(),
+                        PipelineEdge requestInputEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                            requestStreamInput.getId(),
                             stream.getId(), requestStreamInput.getPartition(),
                             requestStreamInput.getEncoder());
                         
requestInputEdge.setEdgeName(GraphRecordNames.Request.name());
@@ -323,7 +372,8 @@ public class PipelinePlanBuilder implements Serializable {
                     }
 
                     // iteration loop edge
-                    PipelineEdge iterationEdge = buildIterationEdge(vId, 
windowGraph.getMsgEncoder());
+                    PipelineEdge iterationEdge = buildIterationEdge(vId,
+                        windowGraph.getMsgEncoder());
                     this.pipelineGraph.addEdge(iterationEdge);
 
                     buildIterationAggVertexAndEdge(pipelineVertex);
@@ -342,31 +392,38 @@ public class PipelinePlanBuilder implements Serializable {
                             
pipelineVertex.setType(VertexType.inc_vertex_centric);
                             break;
                         default:
-                            throw new GeaflowRuntimeException("not support 
graph traversal type, " + traversalType);
+                            throw new GeaflowRuntimeException(
+                                "not support graph traversal type, " + 
traversalType);
                     }
 
                     
pipelineVertex.setIterations(windowGraph.getMaxIterations());
                     Stream vertexStreamInput = windowGraph.getInput();
                     Stream edgeStreamInput = (Stream) windowGraph.getEdges();
-                    Preconditions.checkArgument(vertexStreamInput != null && 
edgeStreamInput != null,
+                    Preconditions.checkArgument(
+                        vertexStreamInput != null && edgeStreamInput != null,
                         "input vertex and edge stream must be not null");
 
                     // Add vertex input.
-                    PipelineEdge vertexInputEdge = new 
PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(),
-                        stream.getId(), vertexStreamInput.getPartition(), 
vertexStreamInput.getEncoder());
+                    PipelineEdge vertexInputEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                        vertexStreamInput.getId(),
+                        stream.getId(), vertexStreamInput.getPartition(),
+                        vertexStreamInput.getEncoder());
                     
vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name());
                     this.pipelineGraph.addEdge(vertexInputEdge);
 
                     // Add edge input.
-                    PipelineEdge edgeInputEdge = new 
PipelineEdge(this.edgeIdGenerator++, edgeStreamInput.getId(),
-                        stream.getId(), edgeStreamInput.getPartition(), 
edgeStreamInput.getEncoder());
+                    PipelineEdge edgeInputEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                        edgeStreamInput.getId(),
+                        stream.getId(), edgeStreamInput.getPartition(),
+                        edgeStreamInput.getEncoder());
                     edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name());
                     this.pipelineGraph.addEdge(edgeInputEdge);
 
                     // Add request input.
                     if (windowGraph.getRequestStream() != null) {
                         Stream requestStreamInput = (Stream) 
windowGraph.getRequestStream();
-                        PipelineEdge requestInputEdge = new 
PipelineEdge(this.edgeIdGenerator++, requestStreamInput.getId(),
+                        PipelineEdge requestInputEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                            requestStreamInput.getId(),
                             stream.getId(), requestStreamInput.getPartition(),
                             requestStreamInput.getEncoder());
                         
requestInputEdge.setEdgeName(GraphRecordNames.Request.name());
@@ -375,7 +432,8 @@ public class PipelinePlanBuilder implements Serializable {
                     }
 
                     // Add iteration loop edge
-                    PipelineEdge iterationEdge = buildIterationEdge(vId, 
windowGraph.getMsgEncoder());
+                    PipelineEdge iterationEdge = buildIterationEdge(vId,
+                        windowGraph.getMsgEncoder());
                     this.pipelineGraph.addEdge(iterationEdge);
 
                     buildIterationAggVertexAndEdge(pipelineVertex);
@@ -387,9 +445,11 @@ public class PipelinePlanBuilder implements Serializable {
                 case StreamTransform: {
                     pipelineVertex.setType(VertexType.process);
                     Stream inputStream = stream.getInput();
-                    Preconditions.checkArgument(inputStream != null, "input 
stream must be not null");
+                    Preconditions.checkArgument(inputStream != null,
+                        "input stream must be not null");
 
-                    PipelineEdge pipelineEdge = new 
PipelineEdge(this.edgeIdGenerator++, inputStream.getId(), stream.getId(),
+                    PipelineEdge pipelineEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                        inputStream.getId(), stream.getId(),
                         inputStream.getPartition(), inputStream.getEncoder());
                     this.pipelineGraph.addEdge(pipelineEdge);
 
@@ -400,9 +460,11 @@ public class PipelinePlanBuilder implements Serializable {
                     pipelineVertex.setType(VertexType.inc_process);
                     pipelineVertex.setAffinity(AffinityLevel.worker);
                     Stream inputStream = stream.getInput();
-                    Preconditions.checkArgument(inputStream != null, "input 
stream must be not null");
+                    Preconditions.checkArgument(inputStream != null,
+                        "input stream must be not null");
 
-                    PipelineEdge pipelineEdge = new 
PipelineEdge(this.edgeIdGenerator++, inputStream.getId(), stream.getId(),
+                    PipelineEdge pipelineEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                        inputStream.getId(), stream.getId(),
                         inputStream.getPartition(), inputStream.getEncoder());
                     this.pipelineGraph.addEdge(pipelineEdge);
 
@@ -414,7 +476,8 @@ public class PipelinePlanBuilder implements Serializable {
                     WindowUnionStream unionStream = (WindowUnionStream) stream;
 
                     Stream mainInput = stream.getInput();
-                    PipelineEdge mainEdge = new 
PipelineEdge(this.edgeIdGenerator++, mainInput.getId(), unionStream.getId(),
+                    PipelineEdge mainEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                        mainInput.getId(), unionStream.getId(),
                         mainInput.getPartition(), unionStream.getEncoder());
                     mainEdge.setStreamOrdinal(0);
                     this.pipelineGraph.addEdge(mainEdge);
@@ -423,8 +486,10 @@ public class PipelinePlanBuilder implements Serializable {
                     List<Stream> otherInputs = 
unionStream.getUnionWindowDataStreamList();
                     for (int index = 0; index < otherInputs.size(); index++) {
                         Stream otherInput = otherInputs.get(index);
-                        PipelineEdge rightEdge = new 
PipelineEdge(this.edgeIdGenerator++, otherInput.getId(),
-                            unionStream.getId(), otherInput.getPartition(), 
otherInput.getEncoder());
+                        PipelineEdge rightEdge = new 
PipelineEdge(this.edgeIdGenerator++,
+                            otherInput.getId(),
+                            unionStream.getId(), otherInput.getPartition(),
+                            otherInput.getEncoder());
                         rightEdge.setStreamOrdinal(index + 1);
                         this.pipelineGraph.addEdge(rightEdge);
                         visitNode(otherInput);
@@ -432,7 +497,8 @@ public class PipelinePlanBuilder implements Serializable {
                     break;
                 }
                 default:
-                    throw new GeaflowRuntimeException("Not supported transform 
type: " + stream.getTransformType());
+                    throw new GeaflowRuntimeException(
+                        "Not supported transform type: " + 
stream.getTransformType());
             }
             this.pipelineGraph.addVertex(pipelineVertex);
         }
@@ -474,7 +540,8 @@ public class PipelinePlanBuilder implements Serializable {
             // Union Optimization.
             boolean isExtraOptimizeSink = 
pipelineConfig.getBoolean(ENABLE_EXTRA_OPTIMIZE_SINK);
             new 
UnionOptimizer(isExtraOptimizeSink).optimizePlan(pipelineGraph);
-            LOGGER.info("union optimize: {}", new 
PlanGraphVisualization(pipelineGraph).getGraphviz());
+            LOGGER.info("union optimize: {}",
+                new PlanGraphVisualization(pipelineGraph).getGraphviz());
         }
         new PipelineGraphOptimizer().optimizePipelineGraph(pipelineGraph);
     }
diff --git 
a/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/test/java/org/apache/geaflow/plan/PipelinePlanTest.java
 
b/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/test/java/org/apache/geaflow/plan/PipelinePlanTest.java
index 5926b0ca..26ff4f35 100644
--- 
a/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/test/java/org/apache/geaflow/plan/PipelinePlanTest.java
+++ 
b/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/test/java/org/apache/geaflow/plan/PipelinePlanTest.java
@@ -60,6 +60,7 @@ import org.apache.geaflow.plan.graph.PipelineVertex;
 import org.apache.geaflow.plan.optimizer.PipelineGraphOptimizer;
 import org.apache.geaflow.view.GraphViewBuilder;
 import org.apache.geaflow.view.IViewDesc;
+import org.apache.geaflow.view.IViewDesc.BackendType;
 import org.apache.geaflow.view.graph.GraphViewDesc;
 import org.apache.geaflow.view.graph.PGraphView;
 import org.apache.geaflow.view.graph.PIncGraphView;
@@ -186,14 +187,15 @@ public class PipelinePlanTest extends BasePlanTest {
         Assert.assertEquals(vertexMap.size(), 8);
     }
 
-    @Test
-    public void testMaterialize() {
-        AtomicInteger idGenerator = new AtomicInteger(0);
-        AbstractPipelineContext context = mock(AbstractPipelineContext.class);
-        when(context.generateId()).then(invocation -> 
idGenerator.incrementAndGet());
+    public void testMaterialize(IViewDesc.BackendType backendType) {
         Configuration configuration = new Configuration();
         configuration.put(FrameworkConfigKeys.INC_STREAM_MATERIALIZE_DISABLE, 
Boolean.TRUE.toString());
-        when(context.getConfig()).thenReturn(configuration);
+        AbstractPipelineContext context = new 
AbstractPipelineContext(configuration) {
+            @Override
+            public int generateId() {
+                return idGenerator.incrementAndGet();
+            }
+        };
 
         PWindowSource<Integer> source1 =
             new WindowStreamSource<>(context, new CollectionSource<>(new 
ArrayList<>()), SizeTumblingWindow.of(10));
@@ -208,7 +210,7 @@ public class PipelinePlanTest extends BasePlanTest {
         final String graphName = "graph_view_name";
         GraphViewDesc graphViewDesc = 
GraphViewBuilder.createGraphView(graphName)
             .withShardNum(4)
-            .withBackend(IViewDesc.BackendType.RocksDB)
+            .withBackend(backendType)
             .withSchema(new GraphMetaType(IntegerType.INSTANCE, 
ValueVertex.class,
                 Integer.class, ValueEdge.class, IntegerType.class))
             .build();
@@ -219,7 +221,6 @@ public class PipelinePlanTest extends BasePlanTest {
                 vertices.window(WindowFactory.createSizeTumblingWindow(1)),
                 edges.window(WindowFactory.createSizeTumblingWindow(1)));
         incGraphView.materialize();
-        when(context.getActions()).thenReturn(ImmutableList.of(((IncGraphView) 
incGraphView).getMaterializedIncGraph()));
 
         PipelinePlanBuilder planBuilder = new PipelinePlanBuilder();
         PipelineGraph pipelineGraph = planBuilder.buildPlan(context);
@@ -227,8 +228,21 @@ public class PipelinePlanTest extends BasePlanTest {
         optimizer.optimizePipelineGraph(pipelineGraph);
 
         Map<Integer, PipelineVertex> vertexMap = pipelineGraph.getVertexMap();
-        Assert.assertEquals(vertexMap.size(), 3);
+        if (backendType == BackendType.Paimon) {
+            Assert.assertEquals(vertexMap.size(), 4);
+        } else {
+            Assert.assertEquals(vertexMap.size(), 3);
+        }
+    }
 
+    @Test
+    public void testRocksDBMaterialize() {
+        testMaterialize(BackendType.RocksDB);
+    }
+
+    @Test
+    public void testPaimonMaterialize() {
+        testMaterialize(BackendType.Paimon);
     }
 
     @Test
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowRuntimeGraph.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowRuntimeGraph.java
index f8e52c30..168d1f32 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowRuntimeGraph.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowRuntimeGraph.java
@@ -19,6 +19,7 @@
 
 package org.apache.geaflow.dsl.runtime.engine;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -73,6 +74,7 @@ import org.apache.geaflow.model.traversal.ITraversalRequest;
 import org.apache.geaflow.model.traversal.ITraversalResponse;
 import 
org.apache.geaflow.operator.impl.graph.traversal.dynamic.DynamicGraphHelper;
 import org.apache.geaflow.pipeline.job.IPipelineJobContext;
+import org.apache.geaflow.view.IViewDesc.BackendType;
 import org.apache.geaflow.view.graph.GraphViewDesc;
 import org.apache.geaflow.view.graph.PGraphView;
 import org.apache.geaflow.view.graph.PIncGraphView;
@@ -179,6 +181,9 @@ public class GeaFlowRuntimeGraph implements RuntimeGraph {
             responsePWindow = staticGraphTraversal(staticGraph, 
parameterStartIds,
                 constantStartIds, executeDagGroup, maxTraversal, 
isAggTraversal, parallelism);
         } else { // traversal on dynamic graph
+            Preconditions.checkArgument(graphViewDesc.getBackend() != 
BackendType.Paimon,
+                "paimon does not support dynamic graph traversal");
+
             boolean enableIncrTraversal = 
DynamicGraphHelper.enableIncrTraversal(maxTraversal, startIds.size(),
                 context.getConfig());
             if (maxTraversal != Integer.MAX_VALUE) {
@@ -357,6 +362,7 @@ public class GeaFlowRuntimeGraph implements RuntimeGraph {
         PWindowStream<ITraversalResponse<Row>> responsePWindow;
         assert graphView instanceof PIncGraphView : "Illegal graph view";
         queryContext.addMaterializedGraph(graph.getName());
+
         if (vertexStream == null && edgeStream == null) { // traversal on 
snapshot of the dynamic graph
             PGraphWindow<Object, Row, Row> staticGraph = 
graphView.snapshot(graphViewDesc.getCurrentVersion());
             boolean enableAlgorithmSplit = algorithm instanceof 
IncrementalAlgorithmUserFunction;
@@ -376,6 +382,9 @@ public class GeaFlowRuntimeGraph implements RuntimeGraph {
                         graphAlgorithm.getParams(), graphSchema, 
parallelism)).start();
             }
         } else { // traversal on dynamic graph
+            Preconditions.checkArgument(graphViewDesc.getBackend() != 
BackendType.Paimon,
+                "paimon does not support dynamic graph traversal");
+
             vertexStream = vertexStream != null ? vertexStream :
                 
queryContext.getEngineContext().createRuntimeTable(queryContext, 
Collections.emptyList())
                     .getPlan();
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLInsertTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLInsertTest.java
index 7d7d34e5..2b750f3a 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLInsertTest.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLInsertTest.java
@@ -20,6 +20,8 @@
 package org.apache.geaflow.dsl.runtime.query;
 
 import org.apache.geaflow.common.config.keys.FrameworkConfigKeys;
+import org.apache.geaflow.common.exception.GeaflowRuntimeException;
+import org.apache.geaflow.store.paimon.config.PaimonConfigKeys;
 import org.testng.annotations.Test;
 
 public class GQLInsertTest {
@@ -114,4 +116,24 @@ public class GQLInsertTest {
             .execute()
             .checkSinkResult();
     }
+
+    @Test
+    public void testInsertAndQuery_006() throws Exception {
+        QueryTester
+            .build()
+            
.withConfig(PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE.getKey(), 
"true")
+            .withQueryPath("/query/gql_insert_and_graph_006.sql")
+            .execute()
+            .checkSinkResult();
+    }
+
+    @Test(expectedExceptions = GeaflowRuntimeException.class)
+    public void testInsertAndQuery_007() throws Exception {
+        QueryTester
+            .build()
+            
.withConfig(PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE.getKey(), 
"true")
+            .withQueryPath("/query/gql_insert_and_graph_007.sql")
+            .execute()
+            .checkSinkResult();
+    }
 }
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_insert_and_graph_006.txt
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_insert_and_graph_006.txt
new file mode 100644
index 00000000..e69de29b
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_006.sql
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_006.sql
new file mode 100644
index 00000000..47f57dd4
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_006.sql
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+set geaflow.dsl.window.size = 3;
+
+CREATE GRAPH dy_modern (
+       Vertex person (
+         id bigint ID,
+         name varchar,
+         age int
+       ),
+       Vertex software (
+         id bigint ID,
+         name varchar,
+         lang varchar
+       ),
+       Edge knows (
+         srcId bigint SOURCE ID,
+         targetId bigint DESTINATION ID,
+         weight double
+       ),
+       Edge created (
+         srcId bigint SOURCE ID,
+       targetId bigint DESTINATION ID,
+       weight double
+       )
+) WITH (
+       storeType='paimon',
+       shardCount = 1
+);
+
+CREATE TABLE tbl_result (
+       id bigint,
+       name varchar,
+       age int
+) WITH (
+       type='file',
+       geaflow.dsl.file.path='${target}'
+);
+
+CREATE TABLE tbl_person (
+       id bigint,
+       name varchar,
+       age int
+) WITH (
+       type='file',
+       geaflow.dsl.file.path='resource:///data/modern_vertex_person.txt'
+);
+
+USE GRAPH dy_modern;
+
+INSERT INTO dy_modern.person(id, name, age)
+SELECT * FROM tbl_person;
+;
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_007.sql
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_007.sql
new file mode 100644
index 00000000..c0586dc1
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_007.sql
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+set geaflow.dsl.window.size = 3;
+
+CREATE GRAPH dy_modern (
+       Vertex person (
+         id bigint ID,
+         name varchar,
+         age int
+       ),
+       Vertex software (
+         id bigint ID,
+         name varchar,
+         lang varchar
+       ),
+       Edge knows (
+         srcId bigint SOURCE ID,
+         targetId bigint DESTINATION ID,
+         weight double
+       ),
+       Edge created (
+         srcId bigint SOURCE ID,
+       targetId bigint DESTINATION ID,
+       weight double
+       )
+) WITH (
+       storeType='paimon',
+       shardCount = 1
+);
+
+CREATE TABLE tbl_result (
+       id bigint,
+       name varchar,
+       age int
+) WITH (
+       type='file',
+       geaflow.dsl.file.path='${target}'
+);
+
+CREATE TABLE tbl_person (
+       id bigint,
+       name varchar,
+       age int
+) WITH (
+       type='file',
+       geaflow.dsl.file.path='resource:///data/modern_vertex_person.txt'
+);
+
+USE GRAPH dy_modern;
+
+INSERT INTO dy_modern.person(id, name, age)
+SELECT * FROM tbl_person;
+;
+
+INSERT INTO tbl_result
+MATCH (a:person)
+RETURN a.id, a.name, a.age
+;
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonGraphStore.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonGraphStore.java
index 479786c1..49db1c58 100644
--- 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonGraphStore.java
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonGraphStore.java
@@ -19,27 +19,75 @@
 
 package org.apache.geaflow.store.paimon;
 
+import static 
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_DATABASE;
+import static 
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_EDGE_TABLE;
+import static 
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_INDEX_TABLE;
+import static 
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_VERTEX_TABLE;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.geaflow.common.config.Configuration;
 import org.apache.geaflow.state.pushdown.inner.CodeGenFilterConverter;
 import org.apache.geaflow.state.pushdown.inner.DirectFilterConverter;
 import org.apache.geaflow.state.pushdown.inner.IFilterConverter;
 import org.apache.geaflow.store.api.graph.IPushDownStore;
 import org.apache.geaflow.store.config.StoreConfigKeys;
 import org.apache.geaflow.store.context.StoreContext;
+import org.apache.paimon.catalog.Identifier;
 
 public abstract class BasePaimonGraphStore extends BasePaimonStore implements 
IPushDownStore {
 
     protected IFilterConverter filterConverter;
+    protected String vertexTable;
+    protected String edgeTable;
+    protected String indexTable;
 
     @Override
     public void init(StoreContext storeContext) {
         super.init(storeContext);
-        boolean codegenEnable =
-            
storeContext.getConfig().getBoolean(StoreConfigKeys.STORE_FILTER_CODEGEN_ENABLE);
-        filterConverter = codegenEnable ? new CodeGenFilterConverter() : new 
DirectFilterConverter();
+        Configuration config = storeContext.getConfig();
+        boolean codegenEnable = 
config.getBoolean(StoreConfigKeys.STORE_FILTER_CODEGEN_ENABLE);
+        filterConverter =
+            codegenEnable ? new CodeGenFilterConverter() : new 
DirectFilterConverter();
+        // use distributed table when distributed mode or database is 
configured.
+        if (config.contains(PAIMON_STORE_DATABASE) || isDistributedMode) {
+            paimonStoreName = config.getString(PAIMON_STORE_DATABASE);
+            vertexTable = config.getString(PAIMON_STORE_VERTEX_TABLE);
+            edgeTable = config.getString(PAIMON_STORE_EDGE_TABLE);
+            indexTable = config.getString(PAIMON_STORE_INDEX_TABLE);
+        }
     }
 
     @Override
     public IFilterConverter getFilterConverter() {
         return filterConverter;
     }
+
+    protected PaimonTableRWHandle createVertexTable(int shardId) {
+        String tableName = vertexTable;
+        if (StringUtils.isEmpty(tableName)) {
+            tableName = String.format("%s#%s", "vertex", shardId);
+        }
+        return createTable(tableName);
+    }
+
+    protected PaimonTableRWHandle createEdgeTable(int shardId) {
+        String tableName = edgeTable;
+        if (StringUtils.isEmpty(edgeTable)) {
+            tableName = String.format("%s#%s", "edge", shardId);
+        }
+        return createTable(tableName);
+    }
+
+    protected PaimonTableRWHandle createIndexTable(int shardId) {
+        String tableName = indexTable;
+        if (StringUtils.isEmpty(indexTable)) {
+            tableName = String.format("%s#%s", "vertex_index", shardId);
+        }
+        return createTable(tableName);
+    }
+
+    private PaimonTableRWHandle createTable(String tableName) {
+        Identifier vertexIndexIdentifier = new Identifier(paimonStoreName, 
tableName);
+        return createKVTableHandle(vertexIndexIdentifier);
+    }
 }
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonStore.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonStore.java
index 59255dfe..767f1b08 100644
--- 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonStore.java
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonStore.java
@@ -19,10 +19,15 @@
 
 package org.apache.geaflow.store.paimon;
 
+import static 
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE;
+import static 
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE;
+
 import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
+import org.apache.geaflow.common.exception.GeaflowRuntimeException;
 import org.apache.geaflow.store.IStatefulStore;
 import org.apache.geaflow.store.api.graph.BaseGraphStore;
 import org.apache.geaflow.store.context.StoreContext;
+import org.apache.paimon.catalog.Catalog.TableNotExistException;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
@@ -42,19 +47,25 @@ public abstract class BasePaimonStore extends 
BaseGraphStore implements IStatefu
     protected static final String DIRECTION_COLUMN_NAME = "direction";
     protected static final String LABEL_COLUMN_NAME = "label";
 
-    protected PaimonTableCatalogClient client;
+    protected PaimonCatalogClient client;
     protected int shardId;
     protected String jobName;
     protected String paimonStoreName;
     protected long lastCheckpointId;
+    protected boolean isDistributedMode;
+    protected boolean enableAutoCreate;
 
     @Override
     public void init(StoreContext storeContext) {
         this.shardId = storeContext.getShardId();
         this.jobName = 
storeContext.getConfig().getString(ExecutionConfigKeys.JOB_APP_NAME);
         this.paimonStoreName = this.jobName + "#" + this.shardId;
-        this.client = new PaimonTableCatalogClient(storeContext.getConfig());
+        this.client = 
PaimonCatalogManager.getCatalogClient(storeContext.getConfig());
         this.lastCheckpointId = Long.MAX_VALUE;
+        this.isDistributedMode = storeContext.getConfig()
+            .getBoolean(PAIMON_STORE_DISTRIBUTED_MODE_ENABLE);
+        this.enableAutoCreate = storeContext.getConfig()
+            .getBoolean(PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE);
     }
 
     @Override
@@ -68,13 +79,23 @@ public abstract class BasePaimonStore extends 
BaseGraphStore implements IStatefu
     }
 
     protected PaimonTableRWHandle createKVTableHandle(Identifier identifier) {
-        Schema.Builder schemaBuilder = Schema.newBuilder();
-        schemaBuilder.primaryKey(KEY_COLUMN_NAME);
-        schemaBuilder.column(KEY_COLUMN_NAME, DataTypes.BYTES());
-        schemaBuilder.column(VALUE_COLUMN_NAME, DataTypes.BYTES());
-        Schema schema = schemaBuilder.build();
-        Table vertexTable = this.client.createTable(schema, identifier);
-        return new PaimonTableRWHandle(identifier, vertexTable);
+        Table vertexTable;
+        try {
+            vertexTable = this.client.getTable(identifier);
+        } catch (TableNotExistException e) {
+            if (enableAutoCreate) {
+                Schema.Builder schemaBuilder = Schema.newBuilder();
+                schemaBuilder.primaryKey(KEY_COLUMN_NAME);
+                schemaBuilder.column(KEY_COLUMN_NAME, DataTypes.BYTES());
+                schemaBuilder.column(VALUE_COLUMN_NAME, DataTypes.BYTES());
+                Schema schema = schemaBuilder.build();
+                vertexTable = this.client.createTable(schema, identifier);
+            } else {
+                throw new GeaflowRuntimeException("Table " + identifier + " 
not exist.");
+            }
+        }
+
+        return new PaimonTableRWHandle(identifier, vertexTable, shardId, 
isDistributedMode);
     }
 
     protected PaimonTableRWHandle createEdgeTableHandle(Identifier identifier) 
{
@@ -92,7 +113,7 @@ public abstract class BasePaimonStore extends BaseGraphStore 
implements IStatefu
         schemaBuilder.column(VALUE_COLUMN_NAME, DataTypes.BYTES());
         Schema schema = schemaBuilder.build();
         Table vertexTable = this.client.createTable(schema, identifier);
-        return new PaimonTableRWHandle(identifier, vertexTable);
+        return new PaimonTableRWHandle(identifier, vertexTable, shardId);
     }
 
     protected PaimonTableRWHandle createVertexTableHandle(Identifier 
identifier) {
@@ -104,6 +125,6 @@ public abstract class BasePaimonStore extends 
BaseGraphStore implements IStatefu
         schemaBuilder.column(VALUE_COLUMN_NAME, DataTypes.BYTES());
         Schema schema = schemaBuilder.build();
         Table vertexTable = this.client.createTable(schema, identifier);
-        return new PaimonTableRWHandle(identifier, vertexTable);
+        return new PaimonTableRWHandle(identifier, vertexTable, shardId);
     }
 }
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/DynamicGraphPaimonStoreBase.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/DynamicGraphPaimonStoreBase.java
index 8ad3bfba..9c4d202d 100644
--- 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/DynamicGraphPaimonStoreBase.java
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/DynamicGraphPaimonStoreBase.java
@@ -34,7 +34,6 @@ import org.apache.geaflow.store.api.graph.IDynamicGraphStore;
 import org.apache.geaflow.store.context.StoreContext;
 import org.apache.geaflow.store.paimon.proxy.IGraphMultiVersionedPaimonProxy;
 import org.apache.geaflow.store.paimon.proxy.PaimonProxyBuilder;
-import org.apache.paimon.catalog.Identifier;
 
 public class DynamicGraphPaimonStoreBase<K, VV, EV> extends 
BasePaimonGraphStore implements
     IDynamicGraphStore<K, VV, EV> {
@@ -47,17 +46,9 @@ public class DynamicGraphPaimonStoreBase<K, VV, EV> extends 
BasePaimonGraphStore
         int[] projection = new int[]{KEY_COLUMN_INDEX, VALUE_COLUMN_INDEX};
 
         // TODO: Use graph schema to create table instead of KV table.
-        String vertexTableName = String.format("%s#%s", "vertex", shardId);
-        Identifier vertexIdentifier = new Identifier(paimonStoreName, 
vertexTableName);
-        PaimonTableRWHandle vertexHandle = 
createKVTableHandle(vertexIdentifier);
-
-        String vertexIndexTableName = String.format("%s#%s", "vertex_index", 
shardId);
-        Identifier vertexIndexIdentifier = new Identifier(paimonStoreName, 
vertexIndexTableName);
-        PaimonTableRWHandle vertexIndexHandle = 
createKVTableHandle(vertexIndexIdentifier);
-
-        String edgeTableName = String.format("%s#%s", "edge", shardId);
-        Identifier edgeIdentifier = new Identifier(paimonStoreName, 
edgeTableName);
-        PaimonTableRWHandle edgeHandle = createKVTableHandle(edgeIdentifier);
+        PaimonTableRWHandle vertexHandle = createVertexTable(shardId);
+        PaimonTableRWHandle vertexIndexHandle = createIndexTable(shardId);
+        PaimonTableRWHandle edgeHandle = createEdgeTable(shardId);
 
         IGraphKVEncoder<K, VV, EV> encoder = 
GraphKVEncoderFactory.build(storeContext.getConfig(),
             storeContext.getGraphSchema());
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/KVPaimonStore.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/KVPaimonStore.java
index 718f0f7c..e2c02841 100644
--- 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/KVPaimonStore.java
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/KVPaimonStore.java
@@ -102,14 +102,14 @@ public class KVPaimonStore<K, V> extends BasePaimonStore 
implements IKVStatefulS
         byte[] keyArray = this.kvSerializer.serializeKey(key);
         byte[] valueArray = this.kvSerializer.serializeValue(value);
         GenericRow record = GenericRow.of(keyArray, valueArray);
-        this.tableHandle.write(record, 0);
+        this.tableHandle.write(record);
     }
 
     @Override
     public void remove(K key) {
         byte[] keyArray = this.kvSerializer.serializeKey(key);
         GenericRow record = GenericRow.ofKind(RowKind.DELETE, keyArray, null);
-        this.tableHandle.write(record, 0);
+        this.tableHandle.write(record);
     }
 
     @Override
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableCatalogClient.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogClient.java
similarity index 59%
rename from 
geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableCatalogClient.java
rename to 
geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogClient.java
index 12391880..f4d163ed 100644
--- 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableCatalogClient.java
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogClient.java
@@ -19,77 +19,46 @@
 
 package org.apache.geaflow.store.paimon;
 
-import static 
org.apache.geaflow.store.paimon.PaimonConfigKeys.PAIMON_OPTIONS_META_STORE;
-import static 
org.apache.geaflow.store.paimon.PaimonConfigKeys.PAIMON_OPTIONS_WAREHOUSE;
-
 import org.apache.geaflow.common.config.Configuration;
 import org.apache.geaflow.common.exception.GeaflowRuntimeException;
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Catalog.TableNotExistException;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PaimonTableCatalogClient {
+public class PaimonCatalogClient {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(PaimonTableCatalogClient.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PaimonCatalogClient.class);
 
     private final Configuration config;
 
     private Catalog catalog;
 
-    public PaimonTableCatalogClient(Configuration config) {
+    public PaimonCatalogClient(Catalog catalog, Configuration config) {
         this.config = config;
+        this.catalog = catalog;
     }
 
     public Table createTable(Schema schema, Identifier identifier) {
-        this.catalog = createCatalog(config);
         try {
+            LOGGER.info("create table {}", identifier.getFullName());
             this.catalog.createDatabase(identifier.getDatabaseName(), true);
             this.catalog.createTable(identifier, schema, true);
+            return getTable(identifier);
         } catch (Exception e) {
             throw new GeaflowRuntimeException("Create database or table 
failed.", e);
         }
-        return getTable(identifier);
-    }
-
-    private Catalog createCatalog(Configuration config) {
-        Options options = new Options();
-
-        String metastore = config.getString(PAIMON_OPTIONS_META_STORE);
-        String warehouse = config.getString(PAIMON_OPTIONS_WAREHOUSE);
-
-        options.set(CatalogOptions.WAREHOUSE, warehouse.toLowerCase());
-        options.set(CatalogOptions.METASTORE, metastore.toLowerCase());
-        if (!metastore.equalsIgnoreCase("filesystem")) {
-            throw new UnsupportedOperationException("Not support meta store 
type " + metastore);
-        }
-
-        if (!warehouse.startsWith("file://")) {
-            throw new UnsupportedOperationException(
-                "Only support warehouse path prefix: [file://]");
-        }
-
-        CatalogContext context = CatalogContext.create(options);
-        return CatalogFactory.createCatalog(context);
     }
 
     public Catalog getCatalog() {
         return this.catalog;
     }
 
-    public Table getTable(Identifier identifier) {
-        try {
-            return this.catalog.getTable(identifier);
-        } catch (Catalog.TableNotExistException e) {
-            // do something
-            throw new GeaflowRuntimeException("table not exist", e);
-        }
+    public Table getTable(Identifier identifier) throws TableNotExistException 
{
+        return this.catalog.getTable(identifier);
     }
 
     public void close() {
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogManager.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogManager.java
new file mode 100644
index 00000000..b8294fc0
--- /dev/null
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.paimon;
+
+import static 
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_META_STORE;
+import static 
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_WAREHOUSE;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.store.paimon.config.PaimonStoreConfig;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PaimonCatalogManager {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PaimonCatalogManager.class);
+
+    private static final Map<String, PaimonCatalogClient> catalogMap = new 
ConcurrentHashMap<>();
+
+    public static synchronized PaimonCatalogClient 
getCatalogClient(Configuration config) {
+        String warehouse = config.getString(PAIMON_STORE_WAREHOUSE);
+        return catalogMap.computeIfAbsent(warehouse, k -> 
createCatalog(config));
+    }
+
+    private static PaimonCatalogClient createCatalog(Configuration config) {
+        String metastore = config.getString(PAIMON_STORE_META_STORE);
+        String warehouse = config.getString(PAIMON_STORE_WAREHOUSE);
+
+        Options options = new Options();
+        options.set(CatalogOptions.WAREHOUSE, warehouse.toLowerCase());
+        options.set(CatalogOptions.METASTORE, metastore.toLowerCase());
+        Map<String, String> extraOptions = 
PaimonStoreConfig.getPaimonOptions(config);
+        if (extraOptions != null) {
+            for (Map.Entry<String, String> entry : extraOptions.entrySet()) {
+                LOGGER.info("add option: {}={}", entry.getKey(), 
entry.getValue());
+                options.set(entry.getKey(), entry.getValue());
+            }
+        }
+        if (!metastore.equalsIgnoreCase("filesystem")) {
+            throw new UnsupportedOperationException("Not support meta store 
type " + metastore);
+        }
+
+        CatalogContext context = CatalogContext.create(options);
+        Catalog catalog = CatalogFactory.createCatalog(context);
+        return new PaimonCatalogClient(catalog, config);
+    }
+
+}
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonConfigKeys.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonConfigKeys.java
deleted file mode 100644
index 294a9c1d..00000000
--- 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonConfigKeys.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.geaflow.store.paimon;
-
-import org.apache.geaflow.common.config.ConfigKey;
-import org.apache.geaflow.common.config.ConfigKeys;
-
-public class PaimonConfigKeys {
-
-    public static final ConfigKey PAIMON_OPTIONS_WAREHOUSE = ConfigKeys
-        .key("geaflow.store.paimon.options.warehouse")
-        .defaultValue("file:///tmp/paimon/")
-        .description("paimon warehouse, default LOCAL path, now support path 
prefix: "
-            + "[file://], Options for future: [hdfs://, oss://, s3://]");
-
-    public static final ConfigKey PAIMON_OPTIONS_META_STORE = ConfigKeys
-        .key("geaflow.store.paimon.options.meta.store")
-        .defaultValue("FILESYSTEM")
-        .description("Metastore of paimon catalog, now support [FILESYSTEM]. 
Options for future: "
-            + "[HIVE, JDBC].");
-
-    public static final ConfigKey PAIMON_OPTIONS_MEMTABLE_SIZE_MB = ConfigKeys
-        .key("geaflow.store.paimon.memtable.size.mb")
-        .defaultValue(128)
-        .description("paimon memtable size, default 256MB");
-}
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableRWHandle.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableRWHandle.java
index c72585b2..d88ecb1c 100644
--- 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableRWHandle.java
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableRWHandle.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.OptionalLong;
 import org.apache.geaflow.common.exception.GeaflowRuntimeException;
+import org.apache.geaflow.store.paimon.commit.PaimonCommitRegistry;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -31,27 +32,43 @@ import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.utils.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PaimonTableRWHandle {
 
-    private Identifier identifier;
-
-    private Table table;
-
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PaimonTableRWHandle.class);
+    private final int shardId;
+    private final Identifier identifier;
+    private final Table table;
+    private final boolean isDistributedMode;
+    private final PaimonCommitRegistry registry;
     private StreamTableWrite streamTableWrite;
-
     private List<CommitMessage> commitMessages = new ArrayList<>();
 
-    public PaimonTableRWHandle(Identifier identifier, Table table) {
+    public PaimonTableRWHandle(Identifier identifier, Table table, int 
shardId) {
+        this(identifier, table, shardId, false);
+    }
+
+    public PaimonTableRWHandle(Identifier identifier, Table table, int shardId,
+                               boolean isDistributedMode) {
+        this.shardId = shardId;
         this.identifier = identifier;
         this.table = table;
         this.streamTableWrite = table.newStreamWriteBuilder().newWrite();
+        this.registry = PaimonCommitRegistry.initInstance();
+        this.isDistributedMode = isDistributedMode;
+    }
+
+    public void write(GenericRow row) {
+        write(row, shardId);
     }
 
     public void write(GenericRow row, int bucket) {
@@ -63,13 +80,34 @@ public class PaimonTableRWHandle {
     }
 
     public void commit(long checkpointId) {
-        try (StreamTableCommit commit = 
table.newStreamWriteBuilder().newCommit()) {
-            flush(checkpointId);
-            commit.commit(checkpointId, commitMessages);
-            commitMessages.clear();
-        } catch (Exception e) {
-            throw new GeaflowRuntimeException("Failed to commit data into 
Paimon.", e);
+        commit(checkpointId, false);
+    }
+
+    public void commit(long checkpointId, boolean waitCompaction) {
+        flush(checkpointId, waitCompaction);
+        List<CommitMessage> messages = new ArrayList<>();
+        for (CommitMessage commitMessage : commitMessages) {
+            if (commitMessage instanceof CommitMessageImpl
+                && ((CommitMessageImpl) commitMessage).isEmpty()) {
+                continue;
+            }
+            messages.add(commitMessage);
         }
+
+        if (isDistributedMode) {
+            LOGGER.info("{} pre commit chkId:{} messages:{} wait:{}",
+                this.identifier, checkpointId, messages.size(), 
waitCompaction);
+            registry.addMessages(shardId, table.name(), messages);
+        } else {
+            LOGGER.info("{} commit chkId:{} messages:{} wait:{}",
+                this.identifier, checkpointId, messages.size(), 
waitCompaction);
+            try (StreamTableCommit commit = 
table.newStreamWriteBuilder().newCommit()) {
+                commit.commit(checkpointId, messages);
+            } catch (Exception e) {
+                throw new GeaflowRuntimeException("Failed to commit data into 
Paimon.", e);
+            }
+        }
+        commitMessages.clear();
     }
 
     public void rollbackTo(long snapshotId) {
@@ -92,6 +130,8 @@ public class PaimonTableRWHandle {
             if (predicate != null) {
                 readBuilder.withFilter(predicate);
             }
+            readBuilder.withBucketFilter(bucketId -> bucketId == shardId);
+
             List<Split> splits = readBuilder.newScan().plan().splits();
             TableRead tableRead = readBuilder.newRead();
             if (predicate != null) {
@@ -108,8 +148,13 @@ public class PaimonTableRWHandle {
     }
 
     public void flush(long checkpointIdentifier) {
+        flush(checkpointIdentifier, false);
+    }
+
+    public void flush(long checkpointIdentifier, boolean waitCompaction) {
         try {
-            this.commitMessages.addAll(streamTableWrite.prepareCommit(false, 
checkpointIdentifier));
+            this.commitMessages.addAll(
+                streamTableWrite.prepareCommit(waitCompaction, 
checkpointIdentifier));
         } catch (Exception e) {
             throw new GeaflowRuntimeException("Failed to flush data into 
Paimon.", e);
         }
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/StaticGraphPaimonStoreBase.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/StaticGraphPaimonStoreBase.java
index 42079eec..4b1dfe66 100644
--- 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/StaticGraphPaimonStoreBase.java
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/StaticGraphPaimonStoreBase.java
@@ -35,7 +35,6 @@ import org.apache.geaflow.store.api.graph.IStaticGraphStore;
 import org.apache.geaflow.store.context.StoreContext;
 import org.apache.geaflow.store.paimon.proxy.IGraphPaimonProxy;
 import org.apache.geaflow.store.paimon.proxy.PaimonProxyBuilder;
-import org.apache.paimon.catalog.Identifier;
 
 public class StaticGraphPaimonStoreBase<K, VV, EV> extends 
BasePaimonGraphStore implements
     IStaticGraphStore<K, VV, EV> {
@@ -49,14 +48,8 @@ public class StaticGraphPaimonStoreBase<K, VV, EV> extends 
BasePaimonGraphStore
         super.init(storeContext);
         int[] projection = new int[]{KEY_COLUMN_INDEX, VALUE_COLUMN_INDEX};
 
-        // TODO: Use graph schema to create table instead of KV table.
-        String vertexTableName = String.format("%s#%s", "vertex", shardId);
-        Identifier vertexIdentifier = new Identifier(paimonStoreName, 
vertexTableName);
-        PaimonTableRWHandle vertexHandle = 
createKVTableHandle(vertexIdentifier);
-
-        String edgeTableName = String.format("%s#%s", "edge", shardId);
-        Identifier edgeIdentifier = new Identifier(paimonStoreName, 
edgeTableName);
-        PaimonTableRWHandle edgeHandle = createKVTableHandle(edgeIdentifier);
+        PaimonTableRWHandle vertexHandle = createVertexTable(shardId);
+        PaimonTableRWHandle edgeHandle = createEdgeTable(shardId);
 
         this.sortAtom = storeContext.getGraphSchema().getEdgeAtoms().get(1);
         IGraphKVEncoder<K, VV, EV> encoder = 
GraphKVEncoderFactory.build(storeContext.getConfig(),
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonCommitRegistry.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonCommitRegistry.java
new file mode 100644
index 00000000..8468a410
--- /dev/null
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonCommitRegistry.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.paimon.commit;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.paimon.table.sink.CommitMessage;
+
+public class PaimonCommitRegistry {
+
+    private static PaimonCommitRegistry instance;
+
+    private final ConcurrentHashMap<Integer, List<TaskCommitMessage>> 
index2CommitMessages =
+        new ConcurrentHashMap<>();
+
+    public static synchronized PaimonCommitRegistry initInstance() {
+        if (instance == null) {
+            instance = new PaimonCommitRegistry();
+        }
+        return instance;
+    }
+
+    public static synchronized PaimonCommitRegistry getInstance() {
+        return instance;
+    }
+
+    public synchronized void addMessages(int index, String tableName,
+                                         List<CommitMessage> commitMessages) {
+        List<TaskCommitMessage> message = 
index2CommitMessages.computeIfAbsent(index,
+            key -> new ArrayList<>());
+        message.add(new TaskCommitMessage(tableName, commitMessages));
+    }
+
+    public synchronized List<TaskCommitMessage> pollMessages(int index) {
+        List<TaskCommitMessage> messages = index2CommitMessages.get(index);
+        if (messages != null && !messages.isEmpty()) {
+            List<TaskCommitMessage> result = new ArrayList<>(messages);
+            messages.clear();
+            return result;
+        }
+        return messages;
+    }
+
+    public static class TaskCommitMessage implements Serializable {
+        private final String tableName;
+        private final List<CommitMessage> messages;
+
+        public TaskCommitMessage(String tableName, List<CommitMessage> 
messages) {
+            this.tableName = tableName;
+            this.messages = messages;
+        }
+
+        public String getTableName() {
+            return tableName;
+        }
+
+        public List<CommitMessage> getMessages() {
+            return messages;
+        }
+    }
+}
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonMessage.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonMessage.java
new file mode 100644
index 00000000..686ef8f4
--- /dev/null
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonMessage.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.paimon.commit;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.geaflow.common.exception.GeaflowRuntimeException;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PaimonMessage implements Serializable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PaimonMessage.class);
+    private static final ThreadLocal<CommitMessageSerializer> CACHE = 
ThreadLocal.withInitial(
+        CommitMessageSerializer::new);
+
+    private transient List<CommitMessage> messages;
+    private byte[] messageBytes;
+    private int serializerVersion;
+    private String tableName;
+    private long chkId;
+
+    public PaimonMessage(long chkId, String tableName, List<CommitMessage> 
messages) {
+        this.chkId = chkId;
+        this.tableName = tableName;
+
+        CommitMessageSerializer serializer = CACHE.get();
+        this.serializerVersion = serializer.getVersion();
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try {
+            serializer.serializeList(messages, new 
DataOutputViewStreamWrapper(out));
+            this.messageBytes = out.toByteArray();
+            LOGGER.info("ser bytes: {}", messageBytes.length);
+        } catch (Exception e) {
+            LOGGER.error("serialize message error", e);
+            throw new GeaflowRuntimeException(e);
+        }
+    }
+
+    public List<CommitMessage> getMessages() {
+        if (messages == null) {
+            CommitMessageSerializer serializer = CACHE.get();
+            try {
+                if (messageBytes == null) {
+                    LOGGER.warn("deserialize message error, null");
+                }
+                ByteArrayInputStream in = new 
ByteArrayInputStream(messageBytes);
+                messages = serializer.deserializeList(serializerVersion,
+                    new DataInputViewStreamWrapper(in));
+            } catch (Exception e) {
+                LOGGER.error("deserialize message error", e);
+                throw new GeaflowRuntimeException(e);
+            }
+        }
+        return messages;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public long getCheckpointId() {
+        return chkId;
+    }
+}
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonConfigKeys.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonConfigKeys.java
new file mode 100644
index 00000000..1e4c80b1
--- /dev/null
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonConfigKeys.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.paimon.config;
+
+import org.apache.geaflow.common.config.ConfigKey;
+import org.apache.geaflow.common.config.ConfigKeys;
+
+public class PaimonConfigKeys {
+
+    public static final ConfigKey PAIMON_STORE_WAREHOUSE = ConfigKeys
+        .key("geaflow.store.paimon.warehouse")
+        .defaultValue("file:///tmp/paimon/")
+        .description("paimon warehouse, default LOCAL path, now support path 
prefix: "
+            + "[file://], Options for future: [hdfs://, oss://, s3://]");
+
+    public static final ConfigKey PAIMON_STORE_META_STORE = ConfigKeys
+        .key("geaflow.store.paimon.meta.store")
+        .defaultValue("FILESYSTEM")
+        .description("Metastore of paimon catalog, now support [FILESYSTEM]. 
Options for future: "
+            + "[HIVE, JDBC].");
+
+    public static final ConfigKey PAIMON_STORE_OPTIONS = ConfigKeys
+        .key("geaflow.store.paimon.options")
+        .defaultValue(128)
+        .description("paimon memtable size, default 256MB");
+
+    public static final ConfigKey PAIMON_STORE_DATABASE = ConfigKeys
+        .key("geaflow.store.paimon.database")
+        .defaultValue("graph")
+        .description("paimon graph store database");
+
+    public static final ConfigKey PAIMON_STORE_VERTEX_TABLE = ConfigKeys
+        .key("geaflow.store.paimon.vertex.table")
+        .defaultValue("vertex")
+        .description("paimon graph store vertex table name");
+
+    public static final ConfigKey PAIMON_STORE_EDGE_TABLE = ConfigKeys
+        .key("geaflow.store.paimon.edge.table")
+        .defaultValue("edge")
+        .description("paimon graph store edge table name");
+
+    public static final ConfigKey PAIMON_STORE_INDEX_TABLE = ConfigKeys
+        .key("geaflow.store.paimon.index.table")
+        .defaultValue("index")
+        .description("paimon graph store index table name");
+
+    public static final ConfigKey PAIMON_STORE_DISTRIBUTED_MODE_ENABLE = 
ConfigKeys
+        .key("geaflow.store.paimon.distributed.mode.enable")
+        .defaultValue(true)
+        .description("paimon graph store distributed mode");
+
+    public static final ConfigKey PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE = 
ConfigKeys
+        .key("geaflow.store.paimon.table.auto.create")
+        .defaultValue(false)
+        .description("paimon graph store table auto create");
+
+}
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonStoreConfig.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonStoreConfig.java
new file mode 100644
index 00000000..e82da589
--- /dev/null
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonStoreConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.paimon.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.geaflow.common.config.Configuration;
+
+public class PaimonStoreConfig {
+
+    public static Map<String, String> getPaimonOptions(Configuration config) {
+        String s = config.getString(PaimonConfigKeys.PAIMON_STORE_OPTIONS,  
"");
+        if (StringUtils.isEmpty(s)) {
+            return null;
+        }
+        Map<String, String> options = new HashMap<>();
+        String[] pairs = s.trim().split(",");
+        for (String pair : pairs) {
+            String[] kv = pair.trim().split("=");
+            if (kv.length < 2) {
+                continue;
+            }
+            options.put(kv[0], kv[1]);
+        }
+        return options;
+    }
+
+}
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonBaseGraphProxy.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonBaseGraphProxy.java
index 2723988a..7fd237c3 100644
--- 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonBaseGraphProxy.java
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonBaseGraphProxy.java
@@ -80,14 +80,14 @@ public abstract class PaimonBaseGraphProxy<K, VV, EV> 
implements IGraphPaimonPro
     public void addEdge(IEdge<K, EV> edge) {
         Tuple<byte[], byte[]> tuple = 
this.encoder.getEdgeEncoder().format(edge);
         GenericRow record = GenericRow.of(tuple.f0, tuple.f1);
-        this.edgeHandle.write(record, 0);
+        this.edgeHandle.write(record);
     }
 
     @Override
     public void addVertex(IVertex<K, VV> vertex) {
         Tuple<byte[], byte[]> tuple = 
this.encoder.getVertexEncoder().format(vertex);
         GenericRow record = GenericRow.of(tuple.f0, tuple.f1);
-        this.vertexHandle.write(record, 0);
+        this.vertexHandle.write(record);
     }
 
     @Override
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonGraphMultiVersionedProxy.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonGraphMultiVersionedProxy.java
index 3d91262c..0e77dc2b 100644
--- 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonGraphMultiVersionedProxy.java
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonGraphMultiVersionedProxy.java
@@ -132,7 +132,7 @@ public class PaimonGraphMultiVersionedProxy<K, VV, EV> 
implements
         Tuple<byte[], byte[]> tuple = edgeEncoder.format(edge);
         byte[] bVersion = getBinaryVersion(version);
         GenericRow record = GenericRow.of(concat(bVersion, tuple.f0), 
tuple.f1);
-        this.edgeHandle.write(record, 0);
+        this.edgeHandle.write(record);
     }
 
     @Override
@@ -141,8 +141,8 @@ public class PaimonGraphMultiVersionedProxy<K, VV, EV> 
implements
         byte[] bVersion = getBinaryVersion(version);
         GenericRow record = GenericRow.of(concat(bVersion, tuple.f0), 
tuple.f1);
         GenericRow index = GenericRow.of(concat(tuple.f0, bVersion), 
EMPTY_BYTES);
-        this.vertexHandle.write(record, 0);
-        this.vertexIndexHandle.write(index, 0);
+        this.vertexHandle.write(record);
+        this.vertexIndexHandle.write(index);
     }
 
     @Override
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/test/java/org/apache/geaflow/store/paimon/PaimonRWHandleTest.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/test/java/org/apache/geaflow/store/paimon/PaimonRWHandleTest.java
index 4856d88c..a951c183 100644
--- 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/test/java/org/apache/geaflow/store/paimon/PaimonRWHandleTest.java
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/test/java/org/apache/geaflow/store/paimon/PaimonRWHandleTest.java
@@ -97,7 +97,7 @@ public class PaimonRWHandleTest {
             value.getBytes()       // value
         );
 
-        edgeHandle.write(row, 0);
+        edgeHandle.write(row);
         long checkpointId = 1L;
         edgeHandle.commit(checkpointId);
 
diff --git 
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonDynamicGraphStateTest.java
 
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonDynamicGraphStateTest.java
index 371598a9..2922d7e5 100644
--- 
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonDynamicGraphStateTest.java
+++ 
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonDynamicGraphStateTest.java
@@ -45,7 +45,7 @@ import org.apache.geaflow.state.data.OneDegreeGraph;
 import org.apache.geaflow.state.descriptor.GraphStateDescriptor;
 import org.apache.geaflow.state.pushdown.filter.IEdgeFilter;
 import org.apache.geaflow.state.pushdown.filter.IVertexFilter;
-import org.apache.geaflow.store.paimon.PaimonConfigKeys;
+import org.apache.geaflow.store.paimon.config.PaimonConfigKeys;
 import org.apache.geaflow.utils.keygroup.DefaultKeyGroupAssigner;
 import org.apache.geaflow.utils.keygroup.KeyGroup;
 import org.testng.Assert;
@@ -67,8 +67,9 @@ public class PaimonDynamicGraphStateTest {
         config.put(FileConfigKeys.PERSISTENT_TYPE.getKey(), "LOCAL");
         config.put(FileConfigKeys.ROOT.getKey(), "/tmp/geaflow/chk/");
         config.put(FileConfigKeys.JSON_CONFIG.getKey(), 
GsonUtil.toJson(persistConfig));
-        config.put(PaimonConfigKeys.PAIMON_OPTIONS_WAREHOUSE.getKey(),
+        config.put(PaimonConfigKeys.PAIMON_STORE_WAREHOUSE.getKey(),
             "file:///tmp/PaimonDynamicGraphStateTest/");
+        
config.put(PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE.getKey(), 
"true");
     }
 
     @AfterClass
@@ -98,13 +99,23 @@ public class PaimonDynamicGraphStateTest {
 
     @Test
     public void testWriteRead() {
-        testApi(false);
+        Map<String, String> conf = new HashMap<>(config);
+        conf.put(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE.getKey(), "false");
+        
conf.put(PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(), 
"false");
+        testApi(conf);
     }
 
-    private void testApi(boolean async) {
-        Map<String, String> conf = config;
+    @Test
+    public void testWriteRead2() {
+        Map<String, String> conf = new HashMap<>(config);
+        conf.put(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE.getKey(), "false");
+        
conf.put(PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(), 
"false");
+        conf.put(PaimonConfigKeys.PAIMON_STORE_DATABASE.getKey(), "graph");
+        testApi(conf);
+    }
+
+    private void testApi(Map<String, String> conf) {
         conf.put(StateConfigKeys.STATE_WRITE_BUFFER_SIZE.getKey(), "100");
-        conf.put(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE.getKey(), 
String.valueOf(async));
         GraphState<String, String, String> graphState = 
getGraphState(StringType.INSTANCE, "testApi", conf);
 
         graphState.manage().operate().setCheckpointId(1);
@@ -126,7 +137,6 @@ public class PaimonDynamicGraphStateTest {
         graphState.dynamicGraph().E().add(4L, new ValueEdge<>("1", "2", "6"));
         graphState.manage().operate().finish();
         graphState.manage().operate().archive();
-        ;
 
         List<IEdge<String, String>> list = 
graphState.dynamicGraph().E().query(1L, "1").asList();
         Assert.assertEquals(list.size(), 2);
@@ -191,7 +201,9 @@ public class PaimonDynamicGraphStateTest {
 
     @Test
     public void testKeyGroup() {
-        Map<String, String> conf = config;
+        Map<String, String> conf = new HashMap<>(config);
+        
conf.put(PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(), 
"false");
+        
conf.put(PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE.getKey(), 
"true");
         GraphState<String, String, String> graphState = 
getGraphState(StringType.INSTANCE, "testKeyGroup", conf);
 
         graphState.manage().operate().setCheckpointId(1);
diff --git 
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
 
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
index 81937688..ed1305dc 100644
--- 
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
+++ 
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
@@ -19,13 +19,16 @@
 
 package org.apache.geaflow.state;
 
+import static 
org.apache.geaflow.common.config.keys.StateConfigKeys.STATE_WRITE_ASYNC_ENABLE;
+import static 
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_DATABASE;
+import static 
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_WAREHOUSE;
+
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.geaflow.common.config.Configuration;
 import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
-import org.apache.geaflow.common.config.keys.StateConfigKeys;
 import org.apache.geaflow.common.type.IType;
 import org.apache.geaflow.common.type.primitive.StringType;
 import org.apache.geaflow.common.utils.GsonUtil;
@@ -36,7 +39,7 @@ import org.apache.geaflow.model.graph.meta.GraphMeta;
 import org.apache.geaflow.model.graph.meta.GraphMetaType;
 import org.apache.geaflow.model.graph.vertex.impl.ValueVertex;
 import org.apache.geaflow.state.descriptor.GraphStateDescriptor;
-import org.apache.geaflow.store.paimon.PaimonConfigKeys;
+import org.apache.geaflow.store.paimon.config.PaimonConfigKeys;
 import org.apache.geaflow.utils.keygroup.DefaultKeyGroupAssigner;
 import org.apache.geaflow.utils.keygroup.KeyGroup;
 import org.testng.Assert;
@@ -58,8 +61,11 @@ public class PaimonGraphStateTest {
         config.put(FileConfigKeys.PERSISTENT_TYPE.getKey(), "LOCAL");
         config.put(FileConfigKeys.ROOT.getKey(), "/tmp/geaflow/chk/");
         config.put(FileConfigKeys.JSON_CONFIG.getKey(), 
GsonUtil.toJson(persistConfig));
-        config.put(PaimonConfigKeys.PAIMON_OPTIONS_WAREHOUSE.getKey(),
+        config.put(PaimonConfigKeys.PAIMON_STORE_WAREHOUSE.getKey(),
             "file:///tmp/PaimonGraphStateTest/");
+        
config.put(PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE.getKey(), 
"true");
+        
config.put(PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(), 
"false");
+
     }
 
     @AfterClass
@@ -87,9 +93,7 @@ public class PaimonGraphStateTest {
         return graphState;
     }
 
-    public void testWriteRead(boolean async) {
-        Map<String, String> conf = new HashMap<>(config);
-        conf.put(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE.getKey(), 
Boolean.toString(async));
+    public void testWriteRead(Map<String, String> conf) {
         GraphState<String, String, String> graphState = 
getGraphState(StringType.INSTANCE,
             "write_read", conf);
 
@@ -102,6 +106,7 @@ public class PaimonGraphStateTest {
             graphState.staticGraph().E().add(new ValueEdge<>("1", id, 
"edge_hello"));
         }
         // read nothing since not committed
+        boolean async = 
Boolean.parseBoolean(conf.get(STATE_WRITE_ASYNC_ENABLE.getKey()));
         if (!async) {
             Assert.assertNull(graphState.staticGraph().V().query("1").get());
             
Assert.assertEquals(graphState.staticGraph().E().query("1").asList().size(), 0);
@@ -150,8 +155,24 @@ public class PaimonGraphStateTest {
 
     @Test
     public void testBothWriteMode() {
-        testWriteRead(true);
-        testWriteRead(false);
+        Map<String, String> conf = new HashMap<>(config);
+        conf.put(STATE_WRITE_ASYNC_ENABLE.getKey(), Boolean.TRUE.toString());
+        testWriteRead(conf);
+
+        conf.put(STATE_WRITE_ASYNC_ENABLE.getKey(), Boolean.TRUE.toString());
+        testWriteRead(conf);
+    }
+
+    @Test
+    public void testBothWriteMode2() {
+        Map<String, String> conf = new HashMap<>(config);
+        conf.put(STATE_WRITE_ASYNC_ENABLE.getKey(), Boolean.TRUE.toString());
+        conf.put(PAIMON_STORE_WAREHOUSE.getKey(), "/tmp/testBothWriteMode2");
+        conf.put(PAIMON_STORE_DATABASE.getKey(), "graph");
+        testWriteRead(conf);
+
+        conf.put(STATE_WRITE_ASYNC_ENABLE.getKey(), Boolean.TRUE.toString());
+        testWriteRead(conf);
     }
 
 }
diff --git 
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonKeyStateTest.java
 
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonKeyStateTest.java
index 0f254149..4d640b90 100644
--- 
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonKeyStateTest.java
+++ 
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonKeyStateTest.java
@@ -27,7 +27,7 @@ import org.apache.geaflow.common.config.Configuration;
 import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
 import org.apache.geaflow.file.FileConfigKeys;
 import org.apache.geaflow.state.descriptor.KeyMapStateDescriptor;
-import org.apache.geaflow.store.paimon.PaimonConfigKeys;
+import org.apache.geaflow.store.paimon.config.PaimonConfigKeys;
 import org.apache.geaflow.utils.keygroup.DefaultKeyGroupAssigner;
 import org.apache.geaflow.utils.keygroup.KeyGroup;
 import org.testng.Assert;
@@ -46,7 +46,10 @@ public class PaimonKeyStateTest {
         config.put(ExecutionConfigKeys.JOB_APP_NAME.getKey(), 
"PaimonKeyStateTest");
         config.put(FileConfigKeys.PERSISTENT_TYPE.getKey(), "LOCAL");
         config.put(FileConfigKeys.ROOT.getKey(), "/tmp/geaflow/chk/");
-        config.put(PaimonConfigKeys.PAIMON_OPTIONS_WAREHOUSE.getKey(), 
"file:///tmp/PaimonKeyStateTest/");
+        config.put(PaimonConfigKeys.PAIMON_STORE_WAREHOUSE.getKey(), 
"file:///tmp"
+            + "/PaimonKeyStateTest/");
+        
config.put(PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(), 
"false");
+        
config.put(PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE.getKey(), 
"true");
     }
 
     @AfterClass
@@ -72,7 +75,7 @@ public class PaimonKeyStateTest {
         Assert.assertEquals(mapState.get("hello").size(), 0);
         // commit chk = 1, now be able to read data.
         mapState.manage().operate().archive();
-        Assert.assertEquals(mapState.get("hello").size(), 4);
+        Assert.assertEquals(mapState.get("hello").size(), 6);
 
         // set chk = 2
         mapState.manage().operate().setCheckpointId(2L);
@@ -81,7 +84,7 @@ public class PaimonKeyStateTest {
         conf2.put("conf2", "test");
         mapState.put("hello2", conf2);
         // cannot read data with chk = 2 since chk2 not committed.
-        Assert.assertEquals(mapState.get("hello").size(), 4);
+        Assert.assertEquals(mapState.get("hello").size(), 6);
         Assert.assertEquals(mapState.get("hello2").size(), 0);
 
         // commit chk = 2
@@ -89,8 +92,8 @@ public class PaimonKeyStateTest {
         mapState.manage().operate().archive();
 
         // now be able to read data
-        Assert.assertEquals(mapState.get("hello").size(), 4);
-        Assert.assertEquals(mapState.get("hello2").size(), 5);
+        Assert.assertEquals(mapState.get("hello").size(), 6);
+        Assert.assertEquals(mapState.get("hello2").size(), 7);
 
         // read data which not exists
         Assert.assertEquals(mapState.get("hello3").size(), 0);
diff --git a/geaflow/pom.xml b/geaflow/pom.xml
index 029b43fb..652f8d88 100644
--- a/geaflow/pom.xml
+++ b/geaflow/pom.xml
@@ -50,7 +50,7 @@
         <fastjson.version>1.2.71_noneautotype</fastjson.version>
         <hikaricp.version>4.0.3</hikaricp.version>
         <sqlite.version>3.40.0.0</sqlite.version>
-        <zstd.version>1.4.3-1</zstd.version>
+        <zstd.version>1.5.5-11</zstd.version>
         <rocksdb.version>7.7.3</rocksdb.version>
         <paimon.version>1.0.1</paimon.version>
         <snappy.version>1.1.8.4</snappy.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to