This is an automated email from the ASF dual-hosted git repository.
haodongtang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new f832169c Support ASync-Write for PaimonGraphStore #577 (#578)
f832169c is described below
commit f832169cf69f89f6bcf2698667109feb52aff4d3
Author: Forest <[email protected]>
AuthorDate: Mon Aug 11 14:01:25 2025 +0800
Support ASync-Write for PaimonGraphStore #577 (#578)
---
.../paimon/proxy/AsyncPaimonGraphRWProxy.java | 113 +++++++++++++++++++++
.../store/paimon/proxy/PaimonProxyBuilder.java | 8 +-
.../apache/geaflow/state/PaimonGraphStateTest.java | 23 +++--
3 files changed, 137 insertions(+), 7 deletions(-)
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/AsyncPaimonGraphRWProxy.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/AsyncPaimonGraphRWProxy.java
new file mode 100644
index 00000000..d7b99025
--- /dev/null
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/AsyncPaimonGraphRWProxy.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.paimon.proxy;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.serialize.SerializerFactory;
+import org.apache.geaflow.model.graph.edge.IEdge;
+import org.apache.geaflow.model.graph.vertex.IVertex;
+import org.apache.geaflow.state.graph.encoder.IGraphKVEncoder;
+import org.apache.geaflow.state.pushdown.IStatePushDown;
+import org.apache.geaflow.state.pushdown.filter.inner.GraphFilter;
+import org.apache.geaflow.state.pushdown.filter.inner.IGraphFilter;
+import org.apache.geaflow.store.data.AsyncFlushBuffer;
+import org.apache.geaflow.store.data.GraphWriteBuffer;
+import org.apache.geaflow.store.paimon.PaimonTableRWHandle;
+
+public class AsyncPaimonGraphRWProxy<K, VV, EV> extends PaimonGraphRWProxy<K,
VV, EV> {
+
+ private final AsyncFlushBuffer<K, VV, EV> flushBuffer;
+
+ public AsyncPaimonGraphRWProxy(PaimonTableRWHandle vertexHandle,
PaimonTableRWHandle edgeHandle,
+ int[] projection, IGraphKVEncoder<K, VV,
EV> encoder,
+ Configuration config) {
+ super(vertexHandle, edgeHandle, projection, encoder);
+ this.flushBuffer = new AsyncFlushBuffer<>(config, this::flush,
+ SerializerFactory.getKryoSerializer());
+ }
+
+ private void flush(GraphWriteBuffer<K, VV, EV> graphWriteBuffer) {
+ if (graphWriteBuffer.getSize() == 0) {
+ return;
+ }
+
+ Collection<IVertex<K, VV>> vertices =
graphWriteBuffer.getVertexId2Vertex().values();
+ for (IVertex<K, VV> vertex : vertices) {
+ super.addVertex(vertex);
+ }
+
+ Collection<List<IEdge<K, EV>>> edgesList =
graphWriteBuffer.getVertexId2Edges().values();
+ for (List<IEdge<K, EV>> edges : edgesList) {
+ for (IEdge<K, EV> edge : edges) {
+ super.addEdge(edge);
+ }
+ }
+
+ super.flush();
+ }
+
+ @Override
+ public void addVertex(IVertex<K, VV> vertex) {
+ this.flushBuffer.addVertex(vertex);
+ }
+
+ @Override
+ public void addEdge(IEdge<K, EV> edge) {
+ this.flushBuffer.addEdge(edge);
+ }
+
+ @Override
+ public IVertex<K, VV> getVertex(K sid, IStatePushDown pushdown) {
+ IVertex<K, VV> vertex = this.flushBuffer.readBufferedVertex(sid);
+ if (vertex != null) {
+ return ((IGraphFilter) pushdown.getFilter()).filterVertex(vertex)
? vertex : null;
+ }
+ return super.getVertex(sid, pushdown);
+ }
+
+ @Override
+ public List<IEdge<K, EV>> getEdges(K sid, IStatePushDown pushdown) {
+ List<IEdge<K, EV>> list = this.flushBuffer.readBufferedEdges(sid);
+ LinkedHashSet<IEdge<K, EV>> set = new LinkedHashSet<>();
+
+ IGraphFilter filter = GraphFilter.of(pushdown.getFilter(),
pushdown.getEdgeLimit());
+
Lists.reverse(list).stream().filter(filter::filterEdge).forEach(set::add);
+ if (!filter.dropAllRemaining()) {
+ set.addAll(super.getEdges(sid, pushdown));
+ }
+
+ return new ArrayList<>(set);
+ }
+
+ @Override
+ public void flush() {
+ flushBuffer.flush();
+ }
+
+ @Override
+ public void close() {
+ flushBuffer.close();
+ }
+}
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonProxyBuilder.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonProxyBuilder.java
index abf9dd53..faf22cad 100644
---
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonProxyBuilder.java
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonProxyBuilder.java
@@ -20,6 +20,7 @@
package org.apache.geaflow.store.paimon.proxy;
import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.config.keys.StateConfigKeys;
import org.apache.geaflow.state.graph.encoder.IGraphKVEncoder;
import org.apache.geaflow.store.paimon.PaimonTableRWHandle;
@@ -31,7 +32,12 @@ public class PaimonProxyBuilder {
int[]
projection,
IGraphKVEncoder<K, VV, EV> encoder) {
// TODO: add readonly proxy.
- return new PaimonGraphRWProxy<>(vertexRWHandle, edgeRWHandle,
projection, encoder);
+ if (config.getBoolean(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE)) {
+ return new AsyncPaimonGraphRWProxy<>(vertexRWHandle, edgeRWHandle,
projection, encoder,
+ config);
+ } else {
+ return new PaimonGraphRWProxy<>(vertexRWHandle, edgeRWHandle,
projection, encoder);
+ }
}
public static <K, VV, EV> IGraphMultiVersionedPaimonProxy<K, VV, EV>
buildMultiVersioned(
diff --git
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
index cb380c64..81937688 100644
---
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
+++
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
+import org.apache.geaflow.common.config.keys.StateConfigKeys;
import org.apache.geaflow.common.type.IType;
import org.apache.geaflow.common.type.primitive.StringType;
import org.apache.geaflow.common.utils.GsonUtil;
@@ -86,9 +87,9 @@ public class PaimonGraphStateTest {
return graphState;
}
- @Test
- public void testWriteRead() {
+ public void testWriteRead(boolean async) {
Map<String, String> conf = new HashMap<>(config);
+ conf.put(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE.getKey(),
Boolean.toString(async));
GraphState<String, String, String> graphState =
getGraphState(StringType.INSTANCE,
"write_read", conf);
@@ -101,8 +102,10 @@ public class PaimonGraphStateTest {
graphState.staticGraph().E().add(new ValueEdge<>("1", id,
"edge_hello"));
}
// read nothing since not committed
- Assert.assertNull(graphState.staticGraph().V().query("1").get());
-
Assert.assertEquals(graphState.staticGraph().E().query("1").asList().size(), 0);
+ if (!async) {
+ Assert.assertNull(graphState.staticGraph().V().query("1").get());
+
Assert.assertEquals(graphState.staticGraph().E().query("1").asList().size(), 0);
+ }
// commit chk = 1, now be able to read data
graphState.manage().operate().archive();
Assert.assertNotNull(graphState.staticGraph().V().query("1").get());
@@ -119,8 +122,10 @@ public class PaimonGraphStateTest {
// be not able to read data with chk = 2 since not committed.
Assert.assertNotNull(graphState.staticGraph().V().query("1").get());
Assert.assertEquals(graphState.staticGraph().E().query("1").asList().size(),
100);
- Assert.assertNull(graphState.staticGraph().V().query("2").get());
-
Assert.assertEquals(graphState.staticGraph().E().query("2").asList().size(), 0);
+ if (!async) {
+ Assert.assertNull(graphState.staticGraph().V().query("2").get());
+
Assert.assertEquals(graphState.staticGraph().E().query("2").asList().size(), 0);
+ }
// commit chk = 2, now be able to read data
graphState.manage().operate().archive();
Assert.assertNotNull(graphState.staticGraph().V().query("1").get());
@@ -143,4 +148,10 @@ public class PaimonGraphStateTest {
graphState.manage().operate().drop();
}
+ @Test
+ public void testBothWriteMode() {
+ testWriteRead(true);
+ testWriteRead(false);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]