This is an automated email from the ASF dual-hosted git repository.

haodongtang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git


The following commit(s) were added to refs/heads/master by this push:
     new f832169c Support ASync-Write for PaimonGraphStore #577 (#578)
f832169c is described below

commit f832169cf69f89f6bcf2698667109feb52aff4d3
Author: Forest <[email protected]>
AuthorDate: Mon Aug 11 14:01:25 2025 +0800

    Support ASync-Write for PaimonGraphStore #577 (#578)
---
 .../paimon/proxy/AsyncPaimonGraphRWProxy.java      | 113 +++++++++++++++++++++
 .../store/paimon/proxy/PaimonProxyBuilder.java     |   8 +-
 .../apache/geaflow/state/PaimonGraphStateTest.java |  23 +++--
 3 files changed, 137 insertions(+), 7 deletions(-)

diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/AsyncPaimonGraphRWProxy.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/AsyncPaimonGraphRWProxy.java
new file mode 100644
index 00000000..d7b99025
--- /dev/null
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/AsyncPaimonGraphRWProxy.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.paimon.proxy;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.serialize.SerializerFactory;
+import org.apache.geaflow.model.graph.edge.IEdge;
+import org.apache.geaflow.model.graph.vertex.IVertex;
+import org.apache.geaflow.state.graph.encoder.IGraphKVEncoder;
+import org.apache.geaflow.state.pushdown.IStatePushDown;
+import org.apache.geaflow.state.pushdown.filter.inner.GraphFilter;
+import org.apache.geaflow.state.pushdown.filter.inner.IGraphFilter;
+import org.apache.geaflow.store.data.AsyncFlushBuffer;
+import org.apache.geaflow.store.data.GraphWriteBuffer;
+import org.apache.geaflow.store.paimon.PaimonTableRWHandle;
+
+public class AsyncPaimonGraphRWProxy<K, VV, EV> extends PaimonGraphRWProxy<K, 
VV, EV> {
+
+    private final AsyncFlushBuffer<K, VV, EV> flushBuffer;
+
+    public AsyncPaimonGraphRWProxy(PaimonTableRWHandle vertexHandle, 
PaimonTableRWHandle edgeHandle,
+                                   int[] projection, IGraphKVEncoder<K, VV, 
EV> encoder,
+                                   Configuration config) {
+        super(vertexHandle, edgeHandle, projection, encoder);
+        this.flushBuffer = new AsyncFlushBuffer<>(config, this::flush,
+            SerializerFactory.getKryoSerializer());
+    }
+
+    private void flush(GraphWriteBuffer<K, VV, EV> graphWriteBuffer) {
+        if (graphWriteBuffer.getSize() == 0) {
+            return;
+        }
+
+        Collection<IVertex<K, VV>> vertices = 
graphWriteBuffer.getVertexId2Vertex().values();
+        for (IVertex<K, VV> vertex : vertices) {
+            super.addVertex(vertex);
+        }
+
+        Collection<List<IEdge<K, EV>>> edgesList = 
graphWriteBuffer.getVertexId2Edges().values();
+        for (List<IEdge<K, EV>> edges : edgesList) {
+            for (IEdge<K, EV> edge : edges) {
+                super.addEdge(edge);
+            }
+        }
+
+        super.flush();
+    }
+
+    @Override
+    public void addVertex(IVertex<K, VV> vertex) {
+        this.flushBuffer.addVertex(vertex);
+    }
+
+    @Override
+    public void addEdge(IEdge<K, EV> edge) {
+        this.flushBuffer.addEdge(edge);
+    }
+
+    @Override
+    public IVertex<K, VV> getVertex(K sid, IStatePushDown pushdown) {
+        IVertex<K, VV> vertex = this.flushBuffer.readBufferedVertex(sid);
+        if (vertex != null) {
+            return ((IGraphFilter) pushdown.getFilter()).filterVertex(vertex) 
? vertex : null;
+        }
+        return super.getVertex(sid, pushdown);
+    }
+
+    @Override
+    public List<IEdge<K, EV>> getEdges(K sid, IStatePushDown pushdown) {
+        List<IEdge<K, EV>> list = this.flushBuffer.readBufferedEdges(sid);
+        LinkedHashSet<IEdge<K, EV>> set = new LinkedHashSet<>();
+
+        IGraphFilter filter = GraphFilter.of(pushdown.getFilter(), 
pushdown.getEdgeLimit());
+        
Lists.reverse(list).stream().filter(filter::filterEdge).forEach(set::add);
+        if (!filter.dropAllRemaining()) {
+            set.addAll(super.getEdges(sid, pushdown));
+        }
+
+        return new ArrayList<>(set);
+    }
+
+    @Override
+    public void flush() {
+        flushBuffer.flush();
+    }
+
+    @Override
+    public void close() {
+        flushBuffer.close();
+    }
+}
diff --git 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonProxyBuilder.java
 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonProxyBuilder.java
index abf9dd53..faf22cad 100644
--- 
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonProxyBuilder.java
+++ 
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonProxyBuilder.java
@@ -20,6 +20,7 @@
 package org.apache.geaflow.store.paimon.proxy;
 
 import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.config.keys.StateConfigKeys;
 import org.apache.geaflow.state.graph.encoder.IGraphKVEncoder;
 import org.apache.geaflow.store.paimon.PaimonTableRWHandle;
 
@@ -31,7 +32,12 @@ public class PaimonProxyBuilder {
                                                                  int[] 
projection,
                                                                  
IGraphKVEncoder<K, VV, EV> encoder) {
         // TODO: add readonly proxy.
-        return new PaimonGraphRWProxy<>(vertexRWHandle, edgeRWHandle, 
projection, encoder);
+        if (config.getBoolean(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE)) {
+            return new AsyncPaimonGraphRWProxy<>(vertexRWHandle, edgeRWHandle, 
projection, encoder,
+                config);
+        } else {
+            return new PaimonGraphRWProxy<>(vertexRWHandle, edgeRWHandle, 
projection, encoder);
+        }
     }
 
     public static <K, VV, EV> IGraphMultiVersionedPaimonProxy<K, VV, EV> 
buildMultiVersioned(
diff --git 
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
 
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
index cb380c64..81937688 100644
--- 
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
+++ 
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.geaflow.common.config.Configuration;
 import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
+import org.apache.geaflow.common.config.keys.StateConfigKeys;
 import org.apache.geaflow.common.type.IType;
 import org.apache.geaflow.common.type.primitive.StringType;
 import org.apache.geaflow.common.utils.GsonUtil;
@@ -86,9 +87,9 @@ public class PaimonGraphStateTest {
         return graphState;
     }
 
-    @Test
-    public void testWriteRead() {
+    public void testWriteRead(boolean async) {
         Map<String, String> conf = new HashMap<>(config);
+        conf.put(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE.getKey(), 
Boolean.toString(async));
         GraphState<String, String, String> graphState = 
getGraphState(StringType.INSTANCE,
             "write_read", conf);
 
@@ -101,8 +102,10 @@ public class PaimonGraphStateTest {
             graphState.staticGraph().E().add(new ValueEdge<>("1", id, 
"edge_hello"));
         }
         // read nothing since not committed
-        Assert.assertNull(graphState.staticGraph().V().query("1").get());
-        
Assert.assertEquals(graphState.staticGraph().E().query("1").asList().size(), 0);
+        if (!async) {
+            Assert.assertNull(graphState.staticGraph().V().query("1").get());
+            
Assert.assertEquals(graphState.staticGraph().E().query("1").asList().size(), 0);
+        }
         // commit chk = 1, now be able to read data
         graphState.manage().operate().archive();
         Assert.assertNotNull(graphState.staticGraph().V().query("1").get());
@@ -119,8 +122,10 @@ public class PaimonGraphStateTest {
         // be not able to read data with chk = 2 since not committed.
         Assert.assertNotNull(graphState.staticGraph().V().query("1").get());
         
Assert.assertEquals(graphState.staticGraph().E().query("1").asList().size(), 
100);
-        Assert.assertNull(graphState.staticGraph().V().query("2").get());
-        
Assert.assertEquals(graphState.staticGraph().E().query("2").asList().size(), 0);
+        if (!async) {
+            Assert.assertNull(graphState.staticGraph().V().query("2").get());
+            
Assert.assertEquals(graphState.staticGraph().E().query("2").asList().size(), 0);
+        }
         // commit chk = 2, now be able to read data
         graphState.manage().operate().archive();
         Assert.assertNotNull(graphState.staticGraph().V().query("1").get());
@@ -143,4 +148,10 @@ public class PaimonGraphStateTest {
         graphState.manage().operate().drop();
     }
 
+    @Test
+    public void testBothWriteMode() {
+        testWriteRead(true);
+        testWriteRead(false);
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to