This is an automated email from the ASF dual-hosted git repository.
loogn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new 5307fcb0 [ISSUE-622] feat: support distributed writing with one Paimon
graph store (#623)
5307fcb0 is described below
commit 5307fcb0f359915057d2b4d1426428c3cc3e4569
Author: Qingwen Zhao <[email protected]>
AuthorDate: Thu Sep 25 15:11:53 2025 +0800
[ISSUE-622] feat: support distributed writing with one Paimon graph store
(#623)
* feat: support paimon store in distributed mode
* update
* fix checkstyle
* update
* fix ut
* update
* disable dynamic traversal with data appending
---
.../java/org/apache/geaflow/view/IViewDesc.java | 2 +
.../graph/materialize/GraphViewMaterializeOp.java | 33 ++++-
.../impl/graph/materialize/PaimonGlobalSink.java | 139 +++++++++++++++++++
.../view/materialize/MaterializedIncGraph.java | 16 ++-
.../pdata/stream/window/WindowStreamSink.java | 3 +-
.../apache/geaflow/plan/PipelinePlanBuilder.java | 153 +++++++++++++++------
.../org/apache/geaflow/plan/PipelinePlanTest.java | 32 +++--
.../dsl/runtime/engine/GeaFlowRuntimeGraph.java | 9 ++
.../geaflow/dsl/runtime/query/GQLInsertTest.java | 22 +++
.../resources/expect/gql_insert_and_graph_006.txt | 0
.../resources/query/gql_insert_and_graph_006.sql | 70 ++++++++++
.../resources/query/gql_insert_and_graph_007.sql | 75 ++++++++++
.../geaflow/store/paimon/BasePaimonGraphStore.java | 54 +++++++-
.../geaflow/store/paimon/BasePaimonStore.java | 43 ++++--
.../store/paimon/DynamicGraphPaimonStoreBase.java | 15 +-
.../apache/geaflow/store/paimon/KVPaimonStore.java | 4 +-
...CatalogClient.java => PaimonCatalogClient.java} | 49 ++-----
.../geaflow/store/paimon/PaimonCatalogManager.java | 71 ++++++++++
.../geaflow/store/paimon/PaimonConfigKeys.java | 43 ------
.../geaflow/store/paimon/PaimonTableRWHandle.java | 71 ++++++++--
.../store/paimon/StaticGraphPaimonStoreBase.java | 11 +-
.../store/paimon/commit/PaimonCommitRegistry.java | 80 +++++++++++
.../geaflow/store/paimon/commit/PaimonMessage.java | 89 ++++++++++++
.../store/paimon/config/PaimonConfigKeys.java | 74 ++++++++++
.../store/paimon/config/PaimonStoreConfig.java | 46 +++++++
.../store/paimon/proxy/PaimonBaseGraphProxy.java | 4 +-
.../proxy/PaimonGraphMultiVersionedProxy.java | 6 +-
.../geaflow/store/paimon/PaimonRWHandleTest.java | 2 +-
.../geaflow/state/PaimonDynamicGraphStateTest.java | 28 ++--
.../apache/geaflow/state/PaimonGraphStateTest.java | 37 +++--
.../apache/geaflow/state/PaimonKeyStateTest.java | 15 +-
geaflow/pom.xml | 2 +-
32 files changed, 1077 insertions(+), 221 deletions(-)
diff --git
a/geaflow/geaflow-core/geaflow-api/src/main/java/org/apache/geaflow/view/IViewDesc.java
b/geaflow/geaflow-core/geaflow-api/src/main/java/org/apache/geaflow/view/IViewDesc.java
index 272d896d..9bd006b6 100644
---
a/geaflow/geaflow-core/geaflow-api/src/main/java/org/apache/geaflow/view/IViewDesc.java
+++
b/geaflow/geaflow-core/geaflow-api/src/main/java/org/apache/geaflow/view/IViewDesc.java
@@ -64,6 +64,8 @@ public interface IViewDesc extends Serializable {
RocksDB,
// Memory backend.
Memory,
+ // Paimon backend.
+ Paimon,
// Custom backend.
Custom;
diff --git
a/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/GraphViewMaterializeOp.java
b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/GraphViewMaterializeOp.java
index fa4d6aef..46b44048 100644
---
a/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/GraphViewMaterializeOp.java
+++
b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/GraphViewMaterializeOp.java
@@ -24,6 +24,7 @@ import static
org.apache.geaflow.operator.Constants.GRAPH_VERSION;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.util.List;
import org.apache.geaflow.api.graph.materialize.GraphMaterializeFunction;
import org.apache.geaflow.api.trait.CheckpointTrait;
import org.apache.geaflow.api.trait.TransactionTrait;
@@ -36,12 +37,17 @@ import org.apache.geaflow.state.DataModel;
import org.apache.geaflow.state.GraphState;
import org.apache.geaflow.state.StateFactory;
import org.apache.geaflow.state.descriptor.GraphStateDescriptor;
+import org.apache.geaflow.store.paimon.commit.PaimonCommitRegistry;
+import
org.apache.geaflow.store.paimon.commit.PaimonCommitRegistry.TaskCommitMessage;
+import org.apache.geaflow.store.paimon.commit.PaimonMessage;
import org.apache.geaflow.utils.keygroup.IKeyGroupAssigner;
import org.apache.geaflow.utils.keygroup.KeyGroup;
import org.apache.geaflow.utils.keygroup.KeyGroupAssignerFactory;
import org.apache.geaflow.utils.keygroup.KeyGroupAssignment;
+import org.apache.geaflow.view.IViewDesc.BackendType;
import org.apache.geaflow.view.graph.GraphViewDesc;
import org.apache.geaflow.view.meta.ViewMetaBookKeeper;
+import org.apache.paimon.table.sink.CommitMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +78,8 @@ public class GraphViewMaterializeOp<K, VV, EV> extends
AbstractOneInputOperator<
int maxPara = graphViewDesc.getShardNum();
int taskPara = runtimeContext.getTaskArgs().getParallelism();
Preconditions.checkArgument(taskPara <= maxPara,
- String.format("task parallelism '%s' must be <= shard num(max
parallelism) '%s'", taskPara, maxPara));
+ String.format("task parallelism '%s' must be <= shard num(max
parallelism) '%s'",
+ taskPara, maxPara));
int taskIndex = runtimeContext.getTaskArgs().getTaskIndex();
KeyGroup keyGroup =
KeyGroupAssignment.computeKeyGroupRangeForOperatorIndex(maxPara,
@@ -85,7 +92,8 @@ public class GraphViewMaterializeOp<K, VV, EV> extends
AbstractOneInputOperator<
int taskId = runtimeContext.getTaskArgs().getTaskId();
LOGGER.info("opName:{} taskId:{} taskIndex:{} keyGroup:{}", name,
taskId,
taskIndex, keyGroup);
- this.graphState = StateFactory.buildGraphState(descriptor,
runtimeContext.getConfiguration());
+ this.graphState = StateFactory.buildGraphState(descriptor,
+ runtimeContext.getConfiguration());
recover();
this.function = new DynamicGraphMaterializeFunction<>(graphState);
}
@@ -105,12 +113,28 @@ public class GraphViewMaterializeOp<K, VV, EV> extends
AbstractOneInputOperator<
this.graphState.manage().operate().setCheckpointId(checkpointId);
this.graphState.manage().operate().finish();
this.graphState.manage().operate().archive();
+
+ if (graphViewDesc.getBackend() == BackendType.Paimon) {
+ int taskIndex = runtimeContext.getTaskArgs().getTaskIndex();
+
+ PaimonCommitRegistry registry = PaimonCommitRegistry.getInstance();
+ List<TaskCommitMessage> messages =
registry.pollMessages(taskIndex);
+ if (messages != null && !messages.isEmpty()) {
+ for (TaskCommitMessage message : messages) {
+ List<CommitMessage> msg = message.getMessages();
+ LOGGER.info("task {} emits windowId:{} chkId:{} table:{}
messages:{}",
+ taskIndex, windowId, checkpointId,
message.getTableName(),
+ msg.size());
+ collectValue(new PaimonMessage(checkpointId,
message.getTableName(),
+ msg));
+ }
+ }
+ }
LOGGER.info("do checkpoint over, checkpointId: {}", checkpointId);
}
@Override
public void finish(long windowId) {
-
}
@Override
@@ -159,7 +183,8 @@ public class GraphViewMaterializeOp<K, VV, EV> extends
AbstractOneInputOperator<
}
}
- public static class DynamicGraphMaterializeFunction<K, VV, EV> implements
GraphMaterializeFunction<K, VV, EV> {
+ public static class DynamicGraphMaterializeFunction<K, VV, EV> implements
+ GraphMaterializeFunction<K, VV, EV> {
private final GraphState<K, VV, EV> graphState;
diff --git
a/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/PaimonGlobalSink.java
b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/PaimonGlobalSink.java
new file mode 100644
index 00000000..44ba94ae
--- /dev/null
+++
b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/PaimonGlobalSink.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.operator.impl.graph.materialize;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.geaflow.api.context.RuntimeContext;
+import org.apache.geaflow.api.function.RichWindowFunction;
+import org.apache.geaflow.api.function.io.SinkFunction;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
+import org.apache.geaflow.common.exception.GeaflowRuntimeException;
+import org.apache.geaflow.store.paimon.PaimonCatalogClient;
+import org.apache.geaflow.store.paimon.PaimonCatalogManager;
+import org.apache.geaflow.store.paimon.commit.PaimonMessage;
+import org.apache.geaflow.store.paimon.config.PaimonConfigKeys;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PaimonGlobalSink extends RichWindowFunction implements
SinkFunction<PaimonMessage> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PaimonGlobalSink.class);
+
+ private String dbName;
+ private String jobName;
+ private long windowId;
+
+ private List<PaimonMessage> paimonMessages;
+ private Map<String, StreamWriteBuilder> writeBuilders;
+ private PaimonCatalogClient client;
+
+ @Override
+ public void open(RuntimeContext runtimeContext) {
+ this.windowId = runtimeContext.getWindowId();
+ Configuration jobConfig = runtimeContext.getConfiguration();
+ this.client = PaimonCatalogManager.getCatalogClient(jobConfig);
+ this.jobName = jobConfig.getString(ExecutionConfigKeys.JOB_APP_NAME);
+ this.dbName =
jobConfig.getString(PaimonConfigKeys.PAIMON_STORE_DATABASE);
+
+ this.writeBuilders = new HashMap<>();
+ this.paimonMessages = new ArrayList<>();
+ LOGGER.info("init paimon sink with db {}", dbName);
+ }
+
+ @Override
+ public void write(PaimonMessage message) throws Exception {
+ paimonMessages.add(message);
+ }
+
+ @Override
+ public void finish() {
+ if (paimonMessages.isEmpty()) {
+ LOGGER.info("commit windowId {} empty messages", windowId);
+ return;
+ }
+
+ final long startTime = System.currentTimeMillis();
+ final long checkpointId = paimonMessages.get(0).getCheckpointId();
+ Map<String, List<CommitMessage>> tableMessages = new HashMap<>();
+ for (PaimonMessage message : paimonMessages) {
+ List<CommitMessage> messages = message.getMessages();
+ if (messages != null && !messages.isEmpty()) {
+ String tableName = message.getTableName();
+ List<CommitMessage> commitMessages =
tableMessages.computeIfAbsent(tableName,
+ k -> new ArrayList<>());
+ commitMessages.addAll(messages);
+ }
+ }
+ long deserializeTime = System.currentTimeMillis() - startTime;
+ if (!tableMessages.isEmpty()) {
+ for (Map.Entry<String, List<CommitMessage>> entry :
tableMessages.entrySet()) {
+ LOGGER.info("commit table:{} messages:{}", entry.getKey(),
entry.getValue().size());
+ StreamWriteBuilder writeBuilder =
getWriteBuilder(entry.getKey());
+ try (StreamTableCommit commit = writeBuilder.newCommit()) {
+ try {
+ commit.commit(checkpointId, entry.getValue());
+ } catch (Throwable e) {
+ LOGGER.warn("commit failed: {}", e.getMessage(), e);
+ Map<Long, List<CommitMessage>> commitIdAndMessages =
new HashMap<>();
+ commitIdAndMessages.put(checkpointId,
entry.getValue());
+ commit.filterAndCommit(commitIdAndMessages);
+ }
+ } catch (Throwable e) {
+ LOGGER.error("Failed to commit data into Paimon: {}",
e.getMessage(), e);
+ throw new GeaflowRuntimeException("Failed to commit data
into Paimon.", e);
+ }
+ }
+ }
+ LOGGER.info("committed chkId:{} messages:{} deserializeCost:{}ms",
+ checkpointId, paimonMessages.size(), deserializeTime);
+ paimonMessages.clear();
+ }
+
+ private StreamWriteBuilder getWriteBuilder(String tableName) {
+ return writeBuilders.computeIfAbsent(tableName, k -> {
+ try {
+ FileStoreTable table = (FileStoreTable)
client.getTable(Identifier.create(dbName,
+ tableName));
+ return table.newStreamWriteBuilder().withCommitUser(jobName);
+ } catch (Throwable e) {
+ String msg = String.format("%s.%s not exist.", dbName,
tableName);
+ throw new GeaflowRuntimeException(msg, e);
+ }
+ });
+ }
+
+ @Override
+ public void close() {
+ LOGGER.info("close sink");
+ if (client != null) {
+ client.close();
+ }
+ }
+
+}
diff --git
a/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/graph/view/materialize/MaterializedIncGraph.java
b/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/graph/view/materialize/MaterializedIncGraph.java
index a8beef88..45bfd98a 100644
---
a/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/graph/view/materialize/MaterializedIncGraph.java
+++
b/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/graph/view/materialize/MaterializedIncGraph.java
@@ -27,14 +27,19 @@ import org.apache.geaflow.model.graph.edge.IEdge;
import org.apache.geaflow.model.graph.vertex.IVertex;
import org.apache.geaflow.operator.base.AbstractOperator;
import
org.apache.geaflow.operator.impl.graph.materialize.GraphViewMaterializeOp;
+import org.apache.geaflow.operator.impl.graph.materialize.PaimonGlobalSink;
+import org.apache.geaflow.operator.impl.window.SinkOperator;
import org.apache.geaflow.partitioner.IPartitioner;
import org.apache.geaflow.partitioner.impl.KeyPartitioner;
import org.apache.geaflow.pdata.graph.view.AbstractGraphView;
import org.apache.geaflow.pdata.graph.window.WindowStreamGraph;
import org.apache.geaflow.pdata.stream.Stream;
import org.apache.geaflow.pdata.stream.TransformType;
+import org.apache.geaflow.pdata.stream.window.WindowStreamSink;
import org.apache.geaflow.pipeline.context.IPipelineContext;
+import org.apache.geaflow.store.paimon.commit.PaimonMessage;
import org.apache.geaflow.view.IViewDesc;
+import org.apache.geaflow.view.IViewDesc.BackendType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +72,16 @@ public class MaterializedIncGraph<K, VV, EV> extends
AbstractGraphView<K, VV, EV
+ "edgeStream parallelism must <= number of graph shard num";
this.edgeStream = this.edgeStream
.keyBy(new WindowStreamGraph.DefaultEdgePartition<>());
- super.context.addPAction(this);
+
+ if (graphViewDesc.getBackend() == BackendType.Paimon) {
+ SinkOperator<PaimonMessage> operator =
+ new SinkOperator<>(new PaimonGlobalSink());
+ WindowStreamSink globalSink = new WindowStreamSink<>(this,
operator);
+ globalSink.withParallelism(1);
+ super.context.addPAction(globalSink);
+ } else {
+ super.context.addPAction(this);
+ }
assert this.getParallelism() == graphViewDesc.getShardNum() :
"Materialize parallelism "
+ "must be equal to the graph shard num.";
diff --git
a/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/stream/window/WindowStreamSink.java
b/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/stream/window/WindowStreamSink.java
index 4dc6de78..9c346013 100644
---
a/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/stream/window/WindowStreamSink.java
+++
b/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/stream/window/WindowStreamSink.java
@@ -35,8 +35,7 @@ public class WindowStreamSink<T> extends Stream<T> implements
PStreamSink<T> {
super(pipelineContext, operator);
}
- public WindowStreamSink(Stream stream,
- AbstractOperator operator) {
+ public WindowStreamSink(Stream stream, AbstractOperator operator) {
super(stream, operator);
}
diff --git
a/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/main/java/org/apache/geaflow/plan/PipelinePlanBuilder.java
b/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/main/java/org/apache/geaflow/plan/PipelinePlanBuilder.java
index e9240115..8ab196b4 100644
---
a/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/main/java/org/apache/geaflow/plan/PipelinePlanBuilder.java
+++
b/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/main/java/org/apache/geaflow/plan/PipelinePlanBuilder.java
@@ -102,7 +102,8 @@ public class PipelinePlanBuilder implements Serializable {
});
boolean isSingleWindow =
this.pipelineGraph.getSourceVertices().stream().allMatch(v ->
- ((AbstractOperator) v.getOperator()).getOpArgs().getOpType() ==
OpArgs.OpType.SINGLE_WINDOW_SOURCE);
+ ((AbstractOperator) v.getOperator()).getOpArgs().getOpType()
+ == OpArgs.OpType.SINGLE_WINDOW_SOURCE);
if (isSingleWindow) {
pipelineContext.getConfig().put(BATCH_NUMBER_PER_CHECKPOINT.getKey(),
String.valueOf(SINGLE_WINDOW_CHECKPOINT_DURATION));
@@ -143,7 +144,8 @@ public class PipelinePlanBuilder implements Serializable {
if (action instanceof PGraphMaterialize) {
visitMaterializeAction((PGraphMaterialize) action);
} else {
- PipelineVertex pipelineVertex = new PipelineVertex(vId,
stream.getOperator(), stream.getParallelism());
+ PipelineVertex pipelineVertex = new PipelineVertex(vId,
stream.getOperator(),
+ stream.getParallelism());
if (action instanceof PStreamSink) {
pipelineVertex.setType(VertexType.sink);
pipelineVertex.setVertexMode(VertexMode.append);
@@ -178,11 +180,13 @@ public class PipelinePlanBuilder implements Serializable {
Preconditions.checkArgument(vertexStreamInput != null &&
edgeStreamInput != null,
"input vertex and edge stream must be not null");
- PipelineEdge vertexInputEdge = new
PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(),
+ PipelineEdge vertexInputEdge = new PipelineEdge(this.edgeIdGenerator++,
+ vertexStreamInput.getId(),
stream.getId(), vertexStreamInput.getPartition(),
vertexStreamInput.getEncoder());
vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name());
this.pipelineGraph.addEdge(vertexInputEdge);
- PipelineEdge edgeInputEdge = new PipelineEdge(this.edgeIdGenerator++,
edgeStreamInput.getId(),
+ PipelineEdge edgeInputEdge = new PipelineEdge(this.edgeIdGenerator++,
+ edgeStreamInput.getId(),
stream.getId(), edgeStreamInput.getPartition(),
edgeStreamInput.getEncoder());
edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name());
this.pipelineGraph.addEdge(edgeInputEdge);
@@ -207,6 +211,30 @@ public class PipelinePlanBuilder implements Serializable {
pipelineVertex.setAffinity(AffinityLevel.worker);
break;
}
+ case ContinueGraphMaterialize:
+ pipelineVertex.setType(VertexType.inc_process);
+ pipelineVertex.setAffinity(AffinityLevel.worker);
+ MaterializedIncGraph pGraphMaterialize =
(MaterializedIncGraph) stream;
+
+ Stream vertexInput = pGraphMaterialize.getInput();
+ Stream edgeInput = (Stream) pGraphMaterialize.getEdges();
+ Preconditions.checkArgument(vertexInput != null &&
edgeInput != null,
+ "input vertex and edge stream must be not null");
+
+ PipelineEdge vertexEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ vertexInput.getId(),
+ stream.getId(), vertexInput.getPartition(),
vertexInput.getEncoder());
+ vertexEdge.setEdgeName(GraphRecordNames.Vertex.name());
+ this.pipelineGraph.addEdge(vertexEdge);
+ PipelineEdge edgeEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ edgeInput.getId(),
+ stream.getId(), edgeInput.getPartition(),
edgeInput.getEncoder());
+ edgeEdge.setEdgeName(GraphRecordNames.Edge.name());
+ this.pipelineGraph.addEdge(edgeEdge);
+
+ visitNode(vertexInput);
+ visitNode(edgeInput);
+ break;
case ContinueGraphCompute: {
pipelineVertex.setType(VertexType.inc_iterator);
pipelineVertex.setAffinity(AffinityLevel.worker);
@@ -217,26 +245,33 @@ public class PipelinePlanBuilder implements Serializable {
pipelineVertex.setType(VertexType.inc_vertex_centric);
break;
default:
- throw new GeaflowRuntimeException("not support
graph compute type, " + computeType);
+ throw new GeaflowRuntimeException(
+ "not support graph compute type, " +
computeType);
}
pipelineVertex.setIterations(pGraphCompute.getMaxIterations());
Stream vertexStreamInput = pGraphCompute.getInput();
Stream edgeStreamInput = (Stream) pGraphCompute.getEdges();
- Preconditions.checkArgument(vertexStreamInput != null &&
edgeStreamInput != null,
+ Preconditions.checkArgument(
+ vertexStreamInput != null && edgeStreamInput != null,
"input vertex and edge stream must be not null");
- PipelineEdge vertexInputEdge = new
PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(),
- stream.getId(), vertexStreamInput.getPartition(),
vertexStreamInput.getEncoder());
+ PipelineEdge vertexInputEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ vertexStreamInput.getId(),
+ stream.getId(), vertexStreamInput.getPartition(),
+ vertexStreamInput.getEncoder());
vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name());
this.pipelineGraph.addEdge(vertexInputEdge);
- PipelineEdge edgeInputEdge = new
PipelineEdge(this.edgeIdGenerator++, edgeStreamInput.getId(),
- stream.getId(), edgeStreamInput.getPartition(),
edgeStreamInput.getEncoder());
+ PipelineEdge edgeInputEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ edgeStreamInput.getId(),
+ stream.getId(), edgeStreamInput.getPartition(),
+ edgeStreamInput.getEncoder());
edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name());
this.pipelineGraph.addEdge(edgeInputEdge);
// iteration loop edge
- PipelineEdge iterationEdge = buildIterationEdge(vId,
pGraphCompute.getMsgEncoder());
+ PipelineEdge iterationEdge = buildIterationEdge(vId,
+ pGraphCompute.getMsgEncoder());
this.pipelineGraph.addEdge(iterationEdge);
buildIterationAggVertexAndEdge(pipelineVertex);
@@ -255,26 +290,33 @@ public class PipelinePlanBuilder implements Serializable {
pipelineVertex.setType(VertexType.vertex_centric);
break;
default:
- throw new GeaflowRuntimeException("not support
graph compute type, " + computeType);
+ throw new GeaflowRuntimeException(
+ "not support graph compute type, " +
computeType);
}
pipelineVertex.setIterations(pGraphCompute.getMaxIterations());
Stream vertexStreamInput = pGraphCompute.getInput();
Stream edgeStreamInput = (Stream) pGraphCompute.getEdges();
- Preconditions.checkArgument(vertexStreamInput != null &&
edgeStreamInput != null,
+ Preconditions.checkArgument(
+ vertexStreamInput != null && edgeStreamInput != null,
"input vertex and edge stream must be not null");
- PipelineEdge vertexInputEdge = new
PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(),
- stream.getId(), vertexStreamInput.getPartition(),
vertexStreamInput.getEncoder());
+ PipelineEdge vertexInputEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ vertexStreamInput.getId(),
+ stream.getId(), vertexStreamInput.getPartition(),
+ vertexStreamInput.getEncoder());
vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name());
this.pipelineGraph.addEdge(vertexInputEdge);
- PipelineEdge edgeInputEdge = new
PipelineEdge(this.edgeIdGenerator++, edgeStreamInput.getId(),
- stream.getId(), edgeStreamInput.getPartition(),
edgeStreamInput.getEncoder());
+ PipelineEdge edgeInputEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ edgeStreamInput.getId(),
+ stream.getId(), edgeStreamInput.getPartition(),
+ edgeStreamInput.getEncoder());
edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name());
this.pipelineGraph.addEdge(edgeInputEdge);
// iteration loop edge
- PipelineEdge iterationEdge = buildIterationEdge(vId,
pGraphCompute.getMsgEncoder());
+ PipelineEdge iterationEdge = buildIterationEdge(vId,
+ pGraphCompute.getMsgEncoder());
this.pipelineGraph.addEdge(iterationEdge);
buildIterationAggVertexAndEdge(pipelineVertex);
@@ -293,28 +335,35 @@ public class PipelinePlanBuilder implements Serializable {
pipelineVertex.setType(VertexType.vertex_centric);
break;
default:
- throw new GeaflowRuntimeException("not support
graph traversal type, " + traversalType);
+ throw new GeaflowRuntimeException(
+ "not support graph traversal type, " +
traversalType);
}
pipelineVertex.setIterations(windowGraph.getMaxIterations());
Stream vertexStreamInput = windowGraph.getInput();
Stream edgeStreamInput = (Stream) windowGraph.getEdges();
- Preconditions.checkArgument(vertexStreamInput != null &&
edgeStreamInput != null,
+ Preconditions.checkArgument(
+ vertexStreamInput != null && edgeStreamInput != null,
"input vertex and edge stream must be not null");
- PipelineEdge vertexInputEdge = new
PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(),
- stream.getId(), vertexStreamInput.getPartition(),
vertexStreamInput.getEncoder());
+ PipelineEdge vertexInputEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ vertexStreamInput.getId(),
+ stream.getId(), vertexStreamInput.getPartition(),
+ vertexStreamInput.getEncoder());
vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name());
this.pipelineGraph.addEdge(vertexInputEdge);
- PipelineEdge edgeInputEdge = new
PipelineEdge(this.edgeIdGenerator++, edgeStreamInput.getId(),
- stream.getId(), edgeStreamInput.getPartition(),
edgeStreamInput.getEncoder());
+ PipelineEdge edgeInputEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ edgeStreamInput.getId(),
+ stream.getId(), edgeStreamInput.getPartition(),
+ edgeStreamInput.getEncoder());
edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name());
this.pipelineGraph.addEdge(edgeInputEdge);
// Add request input.
if (windowGraph.getRequestStream() != null) {
Stream requestStreamInput = (Stream)
windowGraph.getRequestStream();
- PipelineEdge requestInputEdge = new
PipelineEdge(this.edgeIdGenerator++, requestStreamInput.getId(),
+ PipelineEdge requestInputEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ requestStreamInput.getId(),
stream.getId(), requestStreamInput.getPartition(),
requestStreamInput.getEncoder());
requestInputEdge.setEdgeName(GraphRecordNames.Request.name());
@@ -323,7 +372,8 @@ public class PipelinePlanBuilder implements Serializable {
}
// iteration loop edge
- PipelineEdge iterationEdge = buildIterationEdge(vId,
windowGraph.getMsgEncoder());
+ PipelineEdge iterationEdge = buildIterationEdge(vId,
+ windowGraph.getMsgEncoder());
this.pipelineGraph.addEdge(iterationEdge);
buildIterationAggVertexAndEdge(pipelineVertex);
@@ -342,31 +392,38 @@ public class PipelinePlanBuilder implements Serializable {
pipelineVertex.setType(VertexType.inc_vertex_centric);
break;
default:
- throw new GeaflowRuntimeException("not support
graph traversal type, " + traversalType);
+ throw new GeaflowRuntimeException(
+ "not support graph traversal type, " +
traversalType);
}
pipelineVertex.setIterations(windowGraph.getMaxIterations());
Stream vertexStreamInput = windowGraph.getInput();
Stream edgeStreamInput = (Stream) windowGraph.getEdges();
- Preconditions.checkArgument(vertexStreamInput != null &&
edgeStreamInput != null,
+ Preconditions.checkArgument(
+ vertexStreamInput != null && edgeStreamInput != null,
"input vertex and edge stream must be not null");
// Add vertex input.
- PipelineEdge vertexInputEdge = new
PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(),
- stream.getId(), vertexStreamInput.getPartition(),
vertexStreamInput.getEncoder());
+ PipelineEdge vertexInputEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ vertexStreamInput.getId(),
+ stream.getId(), vertexStreamInput.getPartition(),
+ vertexStreamInput.getEncoder());
vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name());
this.pipelineGraph.addEdge(vertexInputEdge);
// Add edge input.
- PipelineEdge edgeInputEdge = new
PipelineEdge(this.edgeIdGenerator++, edgeStreamInput.getId(),
- stream.getId(), edgeStreamInput.getPartition(),
edgeStreamInput.getEncoder());
+ PipelineEdge edgeInputEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ edgeStreamInput.getId(),
+ stream.getId(), edgeStreamInput.getPartition(),
+ edgeStreamInput.getEncoder());
edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name());
this.pipelineGraph.addEdge(edgeInputEdge);
// Add request input.
if (windowGraph.getRequestStream() != null) {
Stream requestStreamInput = (Stream)
windowGraph.getRequestStream();
- PipelineEdge requestInputEdge = new
PipelineEdge(this.edgeIdGenerator++, requestStreamInput.getId(),
+ PipelineEdge requestInputEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ requestStreamInput.getId(),
stream.getId(), requestStreamInput.getPartition(),
requestStreamInput.getEncoder());
requestInputEdge.setEdgeName(GraphRecordNames.Request.name());
@@ -375,7 +432,8 @@ public class PipelinePlanBuilder implements Serializable {
}
// Add iteration loop edge
- PipelineEdge iterationEdge = buildIterationEdge(vId,
windowGraph.getMsgEncoder());
+ PipelineEdge iterationEdge = buildIterationEdge(vId,
+ windowGraph.getMsgEncoder());
this.pipelineGraph.addEdge(iterationEdge);
buildIterationAggVertexAndEdge(pipelineVertex);
@@ -387,9 +445,11 @@ public class PipelinePlanBuilder implements Serializable {
case StreamTransform: {
pipelineVertex.setType(VertexType.process);
Stream inputStream = stream.getInput();
- Preconditions.checkArgument(inputStream != null, "input
stream must be not null");
+ Preconditions.checkArgument(inputStream != null,
+ "input stream must be not null");
- PipelineEdge pipelineEdge = new
PipelineEdge(this.edgeIdGenerator++, inputStream.getId(), stream.getId(),
+ PipelineEdge pipelineEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ inputStream.getId(), stream.getId(),
inputStream.getPartition(), inputStream.getEncoder());
this.pipelineGraph.addEdge(pipelineEdge);
@@ -400,9 +460,11 @@ public class PipelinePlanBuilder implements Serializable {
pipelineVertex.setType(VertexType.inc_process);
pipelineVertex.setAffinity(AffinityLevel.worker);
Stream inputStream = stream.getInput();
- Preconditions.checkArgument(inputStream != null, "input
stream must be not null");
+ Preconditions.checkArgument(inputStream != null,
+ "input stream must be not null");
- PipelineEdge pipelineEdge = new
PipelineEdge(this.edgeIdGenerator++, inputStream.getId(), stream.getId(),
+ PipelineEdge pipelineEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ inputStream.getId(), stream.getId(),
inputStream.getPartition(), inputStream.getEncoder());
this.pipelineGraph.addEdge(pipelineEdge);
@@ -414,7 +476,8 @@ public class PipelinePlanBuilder implements Serializable {
WindowUnionStream unionStream = (WindowUnionStream) stream;
Stream mainInput = stream.getInput();
- PipelineEdge mainEdge = new
PipelineEdge(this.edgeIdGenerator++, mainInput.getId(), unionStream.getId(),
+ PipelineEdge mainEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ mainInput.getId(), unionStream.getId(),
mainInput.getPartition(), unionStream.getEncoder());
mainEdge.setStreamOrdinal(0);
this.pipelineGraph.addEdge(mainEdge);
@@ -423,8 +486,10 @@ public class PipelinePlanBuilder implements Serializable {
List<Stream> otherInputs =
unionStream.getUnionWindowDataStreamList();
for (int index = 0; index < otherInputs.size(); index++) {
Stream otherInput = otherInputs.get(index);
- PipelineEdge rightEdge = new
PipelineEdge(this.edgeIdGenerator++, otherInput.getId(),
- unionStream.getId(), otherInput.getPartition(),
otherInput.getEncoder());
+ PipelineEdge rightEdge = new
PipelineEdge(this.edgeIdGenerator++,
+ otherInput.getId(),
+ unionStream.getId(), otherInput.getPartition(),
+ otherInput.getEncoder());
rightEdge.setStreamOrdinal(index + 1);
this.pipelineGraph.addEdge(rightEdge);
visitNode(otherInput);
@@ -432,7 +497,8 @@ public class PipelinePlanBuilder implements Serializable {
break;
}
default:
- throw new GeaflowRuntimeException("Not supported transform
type: " + stream.getTransformType());
+ throw new GeaflowRuntimeException(
+ "Not supported transform type: " +
stream.getTransformType());
}
this.pipelineGraph.addVertex(pipelineVertex);
}
@@ -474,7 +540,8 @@ public class PipelinePlanBuilder implements Serializable {
// Union Optimization.
boolean isExtraOptimizeSink =
pipelineConfig.getBoolean(ENABLE_EXTRA_OPTIMIZE_SINK);
new
UnionOptimizer(isExtraOptimizeSink).optimizePlan(pipelineGraph);
- LOGGER.info("union optimize: {}", new
PlanGraphVisualization(pipelineGraph).getGraphviz());
+ LOGGER.info("union optimize: {}",
+ new PlanGraphVisualization(pipelineGraph).getGraphviz());
}
new PipelineGraphOptimizer().optimizePipelineGraph(pipelineGraph);
}
diff --git
a/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/test/java/org/apache/geaflow/plan/PipelinePlanTest.java
b/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/test/java/org/apache/geaflow/plan/PipelinePlanTest.java
index 5926b0ca..26ff4f35 100644
---
a/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/test/java/org/apache/geaflow/plan/PipelinePlanTest.java
+++
b/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/test/java/org/apache/geaflow/plan/PipelinePlanTest.java
@@ -60,6 +60,7 @@ import org.apache.geaflow.plan.graph.PipelineVertex;
import org.apache.geaflow.plan.optimizer.PipelineGraphOptimizer;
import org.apache.geaflow.view.GraphViewBuilder;
import org.apache.geaflow.view.IViewDesc;
+import org.apache.geaflow.view.IViewDesc.BackendType;
import org.apache.geaflow.view.graph.GraphViewDesc;
import org.apache.geaflow.view.graph.PGraphView;
import org.apache.geaflow.view.graph.PIncGraphView;
@@ -186,14 +187,15 @@ public class PipelinePlanTest extends BasePlanTest {
Assert.assertEquals(vertexMap.size(), 8);
}
- @Test
- public void testMaterialize() {
- AtomicInteger idGenerator = new AtomicInteger(0);
- AbstractPipelineContext context = mock(AbstractPipelineContext.class);
- when(context.generateId()).then(invocation ->
idGenerator.incrementAndGet());
+ public void testMaterialize(IViewDesc.BackendType backendType) {
Configuration configuration = new Configuration();
configuration.put(FrameworkConfigKeys.INC_STREAM_MATERIALIZE_DISABLE,
Boolean.TRUE.toString());
- when(context.getConfig()).thenReturn(configuration);
+ AbstractPipelineContext context = new
AbstractPipelineContext(configuration) {
+ @Override
+ public int generateId() {
+ return idGenerator.incrementAndGet();
+ }
+ };
PWindowSource<Integer> source1 =
new WindowStreamSource<>(context, new CollectionSource<>(new
ArrayList<>()), SizeTumblingWindow.of(10));
@@ -208,7 +210,7 @@ public class PipelinePlanTest extends BasePlanTest {
final String graphName = "graph_view_name";
GraphViewDesc graphViewDesc =
GraphViewBuilder.createGraphView(graphName)
.withShardNum(4)
- .withBackend(IViewDesc.BackendType.RocksDB)
+ .withBackend(backendType)
.withSchema(new GraphMetaType(IntegerType.INSTANCE,
ValueVertex.class,
Integer.class, ValueEdge.class, IntegerType.class))
.build();
@@ -219,7 +221,6 @@ public class PipelinePlanTest extends BasePlanTest {
vertices.window(WindowFactory.createSizeTumblingWindow(1)),
edges.window(WindowFactory.createSizeTumblingWindow(1)));
incGraphView.materialize();
- when(context.getActions()).thenReturn(ImmutableList.of(((IncGraphView)
incGraphView).getMaterializedIncGraph()));
PipelinePlanBuilder planBuilder = new PipelinePlanBuilder();
PipelineGraph pipelineGraph = planBuilder.buildPlan(context);
@@ -227,8 +228,21 @@ public class PipelinePlanTest extends BasePlanTest {
optimizer.optimizePipelineGraph(pipelineGraph);
Map<Integer, PipelineVertex> vertexMap = pipelineGraph.getVertexMap();
- Assert.assertEquals(vertexMap.size(), 3);
+ if (backendType == BackendType.Paimon) {
+ Assert.assertEquals(vertexMap.size(), 4);
+ } else {
+ Assert.assertEquals(vertexMap.size(), 3);
+ }
+ }
+ @Test
+ public void testRocksDBMaterialize() {
+ testMaterialize(BackendType.RocksDB);
+ }
+
+ @Test
+ public void testPaimonMaterialize() {
+ testMaterialize(BackendType.Paimon);
}
@Test
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowRuntimeGraph.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowRuntimeGraph.java
index f8e52c30..168d1f32 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowRuntimeGraph.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/engine/GeaFlowRuntimeGraph.java
@@ -19,6 +19,7 @@
package org.apache.geaflow.dsl.runtime.engine;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -73,6 +74,7 @@ import org.apache.geaflow.model.traversal.ITraversalRequest;
import org.apache.geaflow.model.traversal.ITraversalResponse;
import
org.apache.geaflow.operator.impl.graph.traversal.dynamic.DynamicGraphHelper;
import org.apache.geaflow.pipeline.job.IPipelineJobContext;
+import org.apache.geaflow.view.IViewDesc.BackendType;
import org.apache.geaflow.view.graph.GraphViewDesc;
import org.apache.geaflow.view.graph.PGraphView;
import org.apache.geaflow.view.graph.PIncGraphView;
@@ -179,6 +181,9 @@ public class GeaFlowRuntimeGraph implements RuntimeGraph {
responsePWindow = staticGraphTraversal(staticGraph,
parameterStartIds,
constantStartIds, executeDagGroup, maxTraversal,
isAggTraversal, parallelism);
} else { // traversal on dynamic graph
+ Preconditions.checkArgument(graphViewDesc.getBackend() !=
BackendType.Paimon,
+ "paimon does not support dynamic graph traversal");
+
boolean enableIncrTraversal =
DynamicGraphHelper.enableIncrTraversal(maxTraversal, startIds.size(),
context.getConfig());
if (maxTraversal != Integer.MAX_VALUE) {
@@ -357,6 +362,7 @@ public class GeaFlowRuntimeGraph implements RuntimeGraph {
PWindowStream<ITraversalResponse<Row>> responsePWindow;
assert graphView instanceof PIncGraphView : "Illegal graph view";
queryContext.addMaterializedGraph(graph.getName());
+
if (vertexStream == null && edgeStream == null) { // traversal on
snapshot of the dynamic graph
PGraphWindow<Object, Row, Row> staticGraph =
graphView.snapshot(graphViewDesc.getCurrentVersion());
boolean enableAlgorithmSplit = algorithm instanceof
IncrementalAlgorithmUserFunction;
@@ -376,6 +382,9 @@ public class GeaFlowRuntimeGraph implements RuntimeGraph {
graphAlgorithm.getParams(), graphSchema,
parallelism)).start();
}
} else { // traversal on dynamic graph
+ Preconditions.checkArgument(graphViewDesc.getBackend() !=
BackendType.Paimon,
+ "paimon does not support dynamic graph traversal");
+
vertexStream = vertexStream != null ? vertexStream :
queryContext.getEngineContext().createRuntimeTable(queryContext,
Collections.emptyList())
.getPlan();
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLInsertTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLInsertTest.java
index 7d7d34e5..2b750f3a 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLInsertTest.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLInsertTest.java
@@ -20,6 +20,8 @@
package org.apache.geaflow.dsl.runtime.query;
import org.apache.geaflow.common.config.keys.FrameworkConfigKeys;
+import org.apache.geaflow.common.exception.GeaflowRuntimeException;
+import org.apache.geaflow.store.paimon.config.PaimonConfigKeys;
import org.testng.annotations.Test;
public class GQLInsertTest {
@@ -114,4 +116,24 @@ public class GQLInsertTest {
.execute()
.checkSinkResult();
}
+
+ @Test
+ public void testInsertAndQuery_006() throws Exception {
+ QueryTester
+ .build()
+
.withConfig(PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE.getKey(),
"true")
+ .withQueryPath("/query/gql_insert_and_graph_006.sql")
+ .execute()
+ .checkSinkResult();
+ }
+
+ @Test(expectedExceptions = GeaflowRuntimeException.class)
+ public void testInsertAndQuery_007() throws Exception {
+ QueryTester
+ .build()
+
.withConfig(PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE.getKey(),
"true")
+ .withQueryPath("/query/gql_insert_and_graph_007.sql")
+ .execute()
+ .checkSinkResult();
+ }
}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_insert_and_graph_006.txt
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_insert_and_graph_006.txt
new file mode 100644
index 00000000..e69de29b
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_006.sql
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_006.sql
new file mode 100644
index 00000000..47f57dd4
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_006.sql
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+set geaflow.dsl.window.size = 3;
+
+CREATE GRAPH dy_modern (
+ Vertex person (
+ id bigint ID,
+ name varchar,
+ age int
+ ),
+ Vertex software (
+ id bigint ID,
+ name varchar,
+ lang varchar
+ ),
+ Edge knows (
+ srcId bigint SOURCE ID,
+ targetId bigint DESTINATION ID,
+ weight double
+ ),
+ Edge created (
+ srcId bigint SOURCE ID,
+ targetId bigint DESTINATION ID,
+ weight double
+ )
+) WITH (
+ storeType='paimon',
+ shardCount = 1
+);
+
+CREATE TABLE tbl_result (
+ id bigint,
+ name varchar,
+ age int
+) WITH (
+ type='file',
+ geaflow.dsl.file.path='${target}'
+);
+
+CREATE TABLE tbl_person (
+ id bigint,
+ name varchar,
+ age int
+) WITH (
+ type='file',
+ geaflow.dsl.file.path='resource:///data/modern_vertex_person.txt'
+);
+
+USE GRAPH dy_modern;
+
+INSERT INTO dy_modern.person(id, name, age)
+SELECT * FROM tbl_person;
+;
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_007.sql
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_007.sql
new file mode 100644
index 00000000..c0586dc1
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_007.sql
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+set geaflow.dsl.window.size = 3;
+
+CREATE GRAPH dy_modern (
+ Vertex person (
+ id bigint ID,
+ name varchar,
+ age int
+ ),
+ Vertex software (
+ id bigint ID,
+ name varchar,
+ lang varchar
+ ),
+ Edge knows (
+ srcId bigint SOURCE ID,
+ targetId bigint DESTINATION ID,
+ weight double
+ ),
+ Edge created (
+ srcId bigint SOURCE ID,
+ targetId bigint DESTINATION ID,
+ weight double
+ )
+) WITH (
+ storeType='paimon',
+ shardCount = 1
+);
+
+CREATE TABLE tbl_result (
+ id bigint,
+ name varchar,
+ age int
+) WITH (
+ type='file',
+ geaflow.dsl.file.path='${target}'
+);
+
+CREATE TABLE tbl_person (
+ id bigint,
+ name varchar,
+ age int
+) WITH (
+ type='file',
+ geaflow.dsl.file.path='resource:///data/modern_vertex_person.txt'
+);
+
+USE GRAPH dy_modern;
+
+INSERT INTO dy_modern.person(id, name, age)
+SELECT * FROM tbl_person;
+;
+
+INSERT INTO tbl_result
+MATCH (a:person)
+RETURN a.id, a.name, a.age
+;
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonGraphStore.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonGraphStore.java
index 479786c1..49db1c58 100644
---
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonGraphStore.java
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonGraphStore.java
@@ -19,27 +19,75 @@
package org.apache.geaflow.store.paimon;
+import static
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_DATABASE;
+import static
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_EDGE_TABLE;
+import static
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_INDEX_TABLE;
+import static
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_VERTEX_TABLE;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.state.pushdown.inner.CodeGenFilterConverter;
import org.apache.geaflow.state.pushdown.inner.DirectFilterConverter;
import org.apache.geaflow.state.pushdown.inner.IFilterConverter;
import org.apache.geaflow.store.api.graph.IPushDownStore;
import org.apache.geaflow.store.config.StoreConfigKeys;
import org.apache.geaflow.store.context.StoreContext;
+import org.apache.paimon.catalog.Identifier;
public abstract class BasePaimonGraphStore extends BasePaimonStore implements
IPushDownStore {
protected IFilterConverter filterConverter;
+ protected String vertexTable;
+ protected String edgeTable;
+ protected String indexTable;
@Override
public void init(StoreContext storeContext) {
super.init(storeContext);
- boolean codegenEnable =
-
storeContext.getConfig().getBoolean(StoreConfigKeys.STORE_FILTER_CODEGEN_ENABLE);
- filterConverter = codegenEnable ? new CodeGenFilterConverter() : new
DirectFilterConverter();
+ Configuration config = storeContext.getConfig();
+ boolean codegenEnable =
config.getBoolean(StoreConfigKeys.STORE_FILTER_CODEGEN_ENABLE);
+ filterConverter =
+ codegenEnable ? new CodeGenFilterConverter() : new
DirectFilterConverter();
+ // use distributed table when distributed mode or database is
configured.
+ if (config.contains(PAIMON_STORE_DATABASE) || isDistributedMode) {
+ paimonStoreName = config.getString(PAIMON_STORE_DATABASE);
+ vertexTable = config.getString(PAIMON_STORE_VERTEX_TABLE);
+ edgeTable = config.getString(PAIMON_STORE_EDGE_TABLE);
+ indexTable = config.getString(PAIMON_STORE_INDEX_TABLE);
+ }
}
@Override
public IFilterConverter getFilterConverter() {
return filterConverter;
}
+
+ protected PaimonTableRWHandle createVertexTable(int shardId) {
+ String tableName = vertexTable;
+ if (StringUtils.isEmpty(tableName)) {
+ tableName = String.format("%s#%s", "vertex", shardId);
+ }
+ return createTable(tableName);
+ }
+
+ protected PaimonTableRWHandle createEdgeTable(int shardId) {
+ String tableName = edgeTable;
+ if (StringUtils.isEmpty(edgeTable)) {
+ tableName = String.format("%s#%s", "edge", shardId);
+ }
+ return createTable(tableName);
+ }
+
+ protected PaimonTableRWHandle createIndexTable(int shardId) {
+ String tableName = indexTable;
+ if (StringUtils.isEmpty(indexTable)) {
+ tableName = String.format("%s#%s", "vertex_index", shardId);
+ }
+ return createTable(tableName);
+ }
+
+ private PaimonTableRWHandle createTable(String tableName) {
+ Identifier vertexIndexIdentifier = new Identifier(paimonStoreName,
tableName);
+ return createKVTableHandle(vertexIndexIdentifier);
+ }
}
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonStore.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonStore.java
index 59255dfe..767f1b08 100644
---
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonStore.java
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonStore.java
@@ -19,10 +19,15 @@
package org.apache.geaflow.store.paimon;
+import static
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE;
+import static
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE;
+
import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
+import org.apache.geaflow.common.exception.GeaflowRuntimeException;
import org.apache.geaflow.store.IStatefulStore;
import org.apache.geaflow.store.api.graph.BaseGraphStore;
import org.apache.geaflow.store.context.StoreContext;
+import org.apache.paimon.catalog.Catalog.TableNotExistException;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
@@ -42,19 +47,25 @@ public abstract class BasePaimonStore extends
BaseGraphStore implements IStatefu
protected static final String DIRECTION_COLUMN_NAME = "direction";
protected static final String LABEL_COLUMN_NAME = "label";
- protected PaimonTableCatalogClient client;
+ protected PaimonCatalogClient client;
protected int shardId;
protected String jobName;
protected String paimonStoreName;
protected long lastCheckpointId;
+ protected boolean isDistributedMode;
+ protected boolean enableAutoCreate;
@Override
public void init(StoreContext storeContext) {
this.shardId = storeContext.getShardId();
this.jobName =
storeContext.getConfig().getString(ExecutionConfigKeys.JOB_APP_NAME);
this.paimonStoreName = this.jobName + "#" + this.shardId;
- this.client = new PaimonTableCatalogClient(storeContext.getConfig());
+ this.client =
PaimonCatalogManager.getCatalogClient(storeContext.getConfig());
this.lastCheckpointId = Long.MAX_VALUE;
+ this.isDistributedMode = storeContext.getConfig()
+ .getBoolean(PAIMON_STORE_DISTRIBUTED_MODE_ENABLE);
+ this.enableAutoCreate = storeContext.getConfig()
+ .getBoolean(PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE);
}
@Override
@@ -68,13 +79,23 @@ public abstract class BasePaimonStore extends
BaseGraphStore implements IStatefu
}
protected PaimonTableRWHandle createKVTableHandle(Identifier identifier) {
- Schema.Builder schemaBuilder = Schema.newBuilder();
- schemaBuilder.primaryKey(KEY_COLUMN_NAME);
- schemaBuilder.column(KEY_COLUMN_NAME, DataTypes.BYTES());
- schemaBuilder.column(VALUE_COLUMN_NAME, DataTypes.BYTES());
- Schema schema = schemaBuilder.build();
- Table vertexTable = this.client.createTable(schema, identifier);
- return new PaimonTableRWHandle(identifier, vertexTable);
+ Table vertexTable;
+ try {
+ vertexTable = this.client.getTable(identifier);
+ } catch (TableNotExistException e) {
+ if (enableAutoCreate) {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.primaryKey(KEY_COLUMN_NAME);
+ schemaBuilder.column(KEY_COLUMN_NAME, DataTypes.BYTES());
+ schemaBuilder.column(VALUE_COLUMN_NAME, DataTypes.BYTES());
+ Schema schema = schemaBuilder.build();
+ vertexTable = this.client.createTable(schema, identifier);
+ } else {
+ throw new GeaflowRuntimeException("Table " + identifier + "
not exist.");
+ }
+ }
+
+ return new PaimonTableRWHandle(identifier, vertexTable, shardId,
isDistributedMode);
}
protected PaimonTableRWHandle createEdgeTableHandle(Identifier identifier)
{
@@ -92,7 +113,7 @@ public abstract class BasePaimonStore extends BaseGraphStore
implements IStatefu
schemaBuilder.column(VALUE_COLUMN_NAME, DataTypes.BYTES());
Schema schema = schemaBuilder.build();
Table vertexTable = this.client.createTable(schema, identifier);
- return new PaimonTableRWHandle(identifier, vertexTable);
+ return new PaimonTableRWHandle(identifier, vertexTable, shardId);
}
protected PaimonTableRWHandle createVertexTableHandle(Identifier
identifier) {
@@ -104,6 +125,6 @@ public abstract class BasePaimonStore extends
BaseGraphStore implements IStatefu
schemaBuilder.column(VALUE_COLUMN_NAME, DataTypes.BYTES());
Schema schema = schemaBuilder.build();
Table vertexTable = this.client.createTable(schema, identifier);
- return new PaimonTableRWHandle(identifier, vertexTable);
+ return new PaimonTableRWHandle(identifier, vertexTable, shardId);
}
}
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/DynamicGraphPaimonStoreBase.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/DynamicGraphPaimonStoreBase.java
index 8ad3bfba..9c4d202d 100644
---
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/DynamicGraphPaimonStoreBase.java
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/DynamicGraphPaimonStoreBase.java
@@ -34,7 +34,6 @@ import org.apache.geaflow.store.api.graph.IDynamicGraphStore;
import org.apache.geaflow.store.context.StoreContext;
import org.apache.geaflow.store.paimon.proxy.IGraphMultiVersionedPaimonProxy;
import org.apache.geaflow.store.paimon.proxy.PaimonProxyBuilder;
-import org.apache.paimon.catalog.Identifier;
public class DynamicGraphPaimonStoreBase<K, VV, EV> extends
BasePaimonGraphStore implements
IDynamicGraphStore<K, VV, EV> {
@@ -47,17 +46,9 @@ public class DynamicGraphPaimonStoreBase<K, VV, EV> extends
BasePaimonGraphStore
int[] projection = new int[]{KEY_COLUMN_INDEX, VALUE_COLUMN_INDEX};
// TODO: Use graph schema to create table instead of KV table.
- String vertexTableName = String.format("%s#%s", "vertex", shardId);
- Identifier vertexIdentifier = new Identifier(paimonStoreName,
vertexTableName);
- PaimonTableRWHandle vertexHandle =
createKVTableHandle(vertexIdentifier);
-
- String vertexIndexTableName = String.format("%s#%s", "vertex_index",
shardId);
- Identifier vertexIndexIdentifier = new Identifier(paimonStoreName,
vertexIndexTableName);
- PaimonTableRWHandle vertexIndexHandle =
createKVTableHandle(vertexIndexIdentifier);
-
- String edgeTableName = String.format("%s#%s", "edge", shardId);
- Identifier edgeIdentifier = new Identifier(paimonStoreName,
edgeTableName);
- PaimonTableRWHandle edgeHandle = createKVTableHandle(edgeIdentifier);
+ PaimonTableRWHandle vertexHandle = createVertexTable(shardId);
+ PaimonTableRWHandle vertexIndexHandle = createIndexTable(shardId);
+ PaimonTableRWHandle edgeHandle = createEdgeTable(shardId);
IGraphKVEncoder<K, VV, EV> encoder =
GraphKVEncoderFactory.build(storeContext.getConfig(),
storeContext.getGraphSchema());
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/KVPaimonStore.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/KVPaimonStore.java
index 718f0f7c..e2c02841 100644
---
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/KVPaimonStore.java
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/KVPaimonStore.java
@@ -102,14 +102,14 @@ public class KVPaimonStore<K, V> extends BasePaimonStore
implements IKVStatefulS
byte[] keyArray = this.kvSerializer.serializeKey(key);
byte[] valueArray = this.kvSerializer.serializeValue(value);
GenericRow record = GenericRow.of(keyArray, valueArray);
- this.tableHandle.write(record, 0);
+ this.tableHandle.write(record);
}
@Override
public void remove(K key) {
byte[] keyArray = this.kvSerializer.serializeKey(key);
GenericRow record = GenericRow.ofKind(RowKind.DELETE, keyArray, null);
- this.tableHandle.write(record, 0);
+ this.tableHandle.write(record);
}
@Override
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableCatalogClient.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogClient.java
similarity index 59%
rename from
geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableCatalogClient.java
rename to
geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogClient.java
index 12391880..f4d163ed 100644
---
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableCatalogClient.java
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogClient.java
@@ -19,77 +19,46 @@
package org.apache.geaflow.store.paimon;
-import static
org.apache.geaflow.store.paimon.PaimonConfigKeys.PAIMON_OPTIONS_META_STORE;
-import static
org.apache.geaflow.store.paimon.PaimonConfigKeys.PAIMON_OPTIONS_WAREHOUSE;
-
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.exception.GeaflowRuntimeException;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Catalog.TableNotExistException;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PaimonTableCatalogClient {
+public class PaimonCatalogClient {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PaimonTableCatalogClient.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PaimonCatalogClient.class);
private final Configuration config;
private Catalog catalog;
- public PaimonTableCatalogClient(Configuration config) {
+ public PaimonCatalogClient(Catalog catalog, Configuration config) {
this.config = config;
+ this.catalog = catalog;
}
public Table createTable(Schema schema, Identifier identifier) {
- this.catalog = createCatalog(config);
try {
+ LOGGER.info("create table {}", identifier.getFullName());
this.catalog.createDatabase(identifier.getDatabaseName(), true);
this.catalog.createTable(identifier, schema, true);
+ return getTable(identifier);
} catch (Exception e) {
throw new GeaflowRuntimeException("Create database or table
failed.", e);
}
- return getTable(identifier);
- }
-
- private Catalog createCatalog(Configuration config) {
- Options options = new Options();
-
- String metastore = config.getString(PAIMON_OPTIONS_META_STORE);
- String warehouse = config.getString(PAIMON_OPTIONS_WAREHOUSE);
-
- options.set(CatalogOptions.WAREHOUSE, warehouse.toLowerCase());
- options.set(CatalogOptions.METASTORE, metastore.toLowerCase());
- if (!metastore.equalsIgnoreCase("filesystem")) {
- throw new UnsupportedOperationException("Not support meta store
type " + metastore);
- }
-
- if (!warehouse.startsWith("file://")) {
- throw new UnsupportedOperationException(
- "Only support warehouse path prefix: [file://]");
- }
-
- CatalogContext context = CatalogContext.create(options);
- return CatalogFactory.createCatalog(context);
}
public Catalog getCatalog() {
return this.catalog;
}
- public Table getTable(Identifier identifier) {
- try {
- return this.catalog.getTable(identifier);
- } catch (Catalog.TableNotExistException e) {
- // do something
- throw new GeaflowRuntimeException("table not exist", e);
- }
+ public Table getTable(Identifier identifier) throws TableNotExistException
{
+ return this.catalog.getTable(identifier);
}
public void close() {
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogManager.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogManager.java
new file mode 100644
index 00000000..b8294fc0
--- /dev/null
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.paimon;
+
+import static
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_META_STORE;
+import static
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_WAREHOUSE;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.store.paimon.config.PaimonStoreConfig;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PaimonCatalogManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PaimonCatalogManager.class);
+
+ private static final Map<String, PaimonCatalogClient> catalogMap = new
ConcurrentHashMap<>();
+
+ public static synchronized PaimonCatalogClient
getCatalogClient(Configuration config) {
+ String warehouse = config.getString(PAIMON_STORE_WAREHOUSE);
+ return catalogMap.computeIfAbsent(warehouse, k ->
createCatalog(config));
+ }
+
+ private static PaimonCatalogClient createCatalog(Configuration config) {
+ String metastore = config.getString(PAIMON_STORE_META_STORE);
+ String warehouse = config.getString(PAIMON_STORE_WAREHOUSE);
+
+ Options options = new Options();
+ options.set(CatalogOptions.WAREHOUSE, warehouse.toLowerCase());
+ options.set(CatalogOptions.METASTORE, metastore.toLowerCase());
+ Map<String, String> extraOptions =
PaimonStoreConfig.getPaimonOptions(config);
+ if (extraOptions != null) {
+ for (Map.Entry<String, String> entry : extraOptions.entrySet()) {
+ LOGGER.info("add option: {}={}", entry.getKey(),
entry.getValue());
+ options.set(entry.getKey(), entry.getValue());
+ }
+ }
+ if (!metastore.equalsIgnoreCase("filesystem")) {
+ throw new UnsupportedOperationException("Not support meta store
type " + metastore);
+ }
+
+ CatalogContext context = CatalogContext.create(options);
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ return new PaimonCatalogClient(catalog, config);
+ }
+
+}
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonConfigKeys.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonConfigKeys.java
deleted file mode 100644
index 294a9c1d..00000000
---
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonConfigKeys.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.geaflow.store.paimon;
-
-import org.apache.geaflow.common.config.ConfigKey;
-import org.apache.geaflow.common.config.ConfigKeys;
-
-public class PaimonConfigKeys {
-
- public static final ConfigKey PAIMON_OPTIONS_WAREHOUSE = ConfigKeys
- .key("geaflow.store.paimon.options.warehouse")
- .defaultValue("file:///tmp/paimon/")
- .description("paimon warehouse, default LOCAL path, now support path
prefix: "
- + "[file://], Options for future: [hdfs://, oss://, s3://]");
-
- public static final ConfigKey PAIMON_OPTIONS_META_STORE = ConfigKeys
- .key("geaflow.store.paimon.options.meta.store")
- .defaultValue("FILESYSTEM")
- .description("Metastore of paimon catalog, now support [FILESYSTEM].
Options for future: "
- + "[HIVE, JDBC].");
-
- public static final ConfigKey PAIMON_OPTIONS_MEMTABLE_SIZE_MB = ConfigKeys
- .key("geaflow.store.paimon.memtable.size.mb")
- .defaultValue(128)
- .description("paimon memtable size, default 256MB");
-}
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableRWHandle.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableRWHandle.java
index c72585b2..d88ecb1c 100644
---
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableRWHandle.java
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableRWHandle.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import org.apache.geaflow.common.exception.GeaflowRuntimeException;
+import org.apache.geaflow.store.paimon.commit.PaimonCommitRegistry;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -31,27 +32,43 @@ import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.utils.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PaimonTableRWHandle {
- private Identifier identifier;
-
- private Table table;
-
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PaimonTableRWHandle.class);
+ private final int shardId;
+ private final Identifier identifier;
+ private final Table table;
+ private final boolean isDistributedMode;
+ private final PaimonCommitRegistry registry;
private StreamTableWrite streamTableWrite;
-
private List<CommitMessage> commitMessages = new ArrayList<>();
- public PaimonTableRWHandle(Identifier identifier, Table table) {
+ public PaimonTableRWHandle(Identifier identifier, Table table, int
shardId) {
+ this(identifier, table, shardId, false);
+ }
+
+ public PaimonTableRWHandle(Identifier identifier, Table table, int shardId,
+ boolean isDistributedMode) {
+ this.shardId = shardId;
this.identifier = identifier;
this.table = table;
this.streamTableWrite = table.newStreamWriteBuilder().newWrite();
+ this.registry = PaimonCommitRegistry.initInstance();
+ this.isDistributedMode = isDistributedMode;
+ }
+
+ public void write(GenericRow row) {
+ write(row, shardId);
}
public void write(GenericRow row, int bucket) {
@@ -63,13 +80,34 @@ public class PaimonTableRWHandle {
}
public void commit(long checkpointId) {
- try (StreamTableCommit commit =
table.newStreamWriteBuilder().newCommit()) {
- flush(checkpointId);
- commit.commit(checkpointId, commitMessages);
- commitMessages.clear();
- } catch (Exception e) {
- throw new GeaflowRuntimeException("Failed to commit data into
Paimon.", e);
+ commit(checkpointId, false);
+ }
+
+ public void commit(long checkpointId, boolean waitCompaction) {
+ flush(checkpointId, waitCompaction);
+ List<CommitMessage> messages = new ArrayList<>();
+ for (CommitMessage commitMessage : commitMessages) {
+ if (commitMessage instanceof CommitMessageImpl
+ && ((CommitMessageImpl) commitMessage).isEmpty()) {
+ continue;
+ }
+ messages.add(commitMessage);
}
+
+ if (isDistributedMode) {
+ LOGGER.info("{} pre commit chkId:{} messages:{} wait:{}",
+ this.identifier, checkpointId, messages.size(),
waitCompaction);
+ registry.addMessages(shardId, table.name(), messages);
+ } else {
+ LOGGER.info("{} commit chkId:{} messages:{} wait:{}",
+ this.identifier, checkpointId, messages.size(),
waitCompaction);
+ try (StreamTableCommit commit =
table.newStreamWriteBuilder().newCommit()) {
+ commit.commit(checkpointId, messages);
+ } catch (Exception e) {
+ throw new GeaflowRuntimeException("Failed to commit data into
Paimon.", e);
+ }
+ }
+ commitMessages.clear();
}
public void rollbackTo(long snapshotId) {
@@ -92,6 +130,8 @@ public class PaimonTableRWHandle {
if (predicate != null) {
readBuilder.withFilter(predicate);
}
+ readBuilder.withBucketFilter(bucketId -> bucketId == shardId);
+
List<Split> splits = readBuilder.newScan().plan().splits();
TableRead tableRead = readBuilder.newRead();
if (predicate != null) {
@@ -108,8 +148,13 @@ public class PaimonTableRWHandle {
}
public void flush(long checkpointIdentifier) {
+ flush(checkpointIdentifier, false);
+ }
+
+ public void flush(long checkpointIdentifier, boolean waitCompaction) {
try {
- this.commitMessages.addAll(streamTableWrite.prepareCommit(false,
checkpointIdentifier));
+ this.commitMessages.addAll(
+ streamTableWrite.prepareCommit(waitCompaction,
checkpointIdentifier));
} catch (Exception e) {
throw new GeaflowRuntimeException("Failed to flush data into
Paimon.", e);
}
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/StaticGraphPaimonStoreBase.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/StaticGraphPaimonStoreBase.java
index 42079eec..4b1dfe66 100644
---
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/StaticGraphPaimonStoreBase.java
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/StaticGraphPaimonStoreBase.java
@@ -35,7 +35,6 @@ import org.apache.geaflow.store.api.graph.IStaticGraphStore;
import org.apache.geaflow.store.context.StoreContext;
import org.apache.geaflow.store.paimon.proxy.IGraphPaimonProxy;
import org.apache.geaflow.store.paimon.proxy.PaimonProxyBuilder;
-import org.apache.paimon.catalog.Identifier;
public class StaticGraphPaimonStoreBase<K, VV, EV> extends
BasePaimonGraphStore implements
IStaticGraphStore<K, VV, EV> {
@@ -49,14 +48,8 @@ public class StaticGraphPaimonStoreBase<K, VV, EV> extends
BasePaimonGraphStore
super.init(storeContext);
int[] projection = new int[]{KEY_COLUMN_INDEX, VALUE_COLUMN_INDEX};
- // TODO: Use graph schema to create table instead of KV table.
- String vertexTableName = String.format("%s#%s", "vertex", shardId);
- Identifier vertexIdentifier = new Identifier(paimonStoreName,
vertexTableName);
- PaimonTableRWHandle vertexHandle =
createKVTableHandle(vertexIdentifier);
-
- String edgeTableName = String.format("%s#%s", "edge", shardId);
- Identifier edgeIdentifier = new Identifier(paimonStoreName,
edgeTableName);
- PaimonTableRWHandle edgeHandle = createKVTableHandle(edgeIdentifier);
+ PaimonTableRWHandle vertexHandle = createVertexTable(shardId);
+ PaimonTableRWHandle edgeHandle = createEdgeTable(shardId);
this.sortAtom = storeContext.getGraphSchema().getEdgeAtoms().get(1);
IGraphKVEncoder<K, VV, EV> encoder =
GraphKVEncoderFactory.build(storeContext.getConfig(),
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonCommitRegistry.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonCommitRegistry.java
new file mode 100644
index 00000000..8468a410
--- /dev/null
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonCommitRegistry.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.paimon.commit;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.paimon.table.sink.CommitMessage;
+
+public class PaimonCommitRegistry {
+
+ private static PaimonCommitRegistry instance;
+
+ private final ConcurrentHashMap<Integer, List<TaskCommitMessage>>
index2CommitMessages =
+ new ConcurrentHashMap<>();
+
+ public static synchronized PaimonCommitRegistry initInstance() {
+ if (instance == null) {
+ instance = new PaimonCommitRegistry();
+ }
+ return instance;
+ }
+
+ public static synchronized PaimonCommitRegistry getInstance() {
+ return instance;
+ }
+
+ public synchronized void addMessages(int index, String tableName,
+ List<CommitMessage> commitMessages) {
+ List<TaskCommitMessage> message =
index2CommitMessages.computeIfAbsent(index,
+ key -> new ArrayList<>());
+ message.add(new TaskCommitMessage(tableName, commitMessages));
+ }
+
+ public synchronized List<TaskCommitMessage> pollMessages(int index) {
+ List<TaskCommitMessage> messages = index2CommitMessages.get(index);
+ if (messages != null && !messages.isEmpty()) {
+ List<TaskCommitMessage> result = new ArrayList<>(messages);
+ messages.clear();
+ return result;
+ }
+ return messages;
+ }
+
+ public static class TaskCommitMessage implements Serializable {
+ private final String tableName;
+ private final List<CommitMessage> messages;
+
+ public TaskCommitMessage(String tableName, List<CommitMessage>
messages) {
+ this.tableName = tableName;
+ this.messages = messages;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public List<CommitMessage> getMessages() {
+ return messages;
+ }
+ }
+}
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonMessage.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonMessage.java
new file mode 100644
index 00000000..686ef8f4
--- /dev/null
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonMessage.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.paimon.commit;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.geaflow.common.exception.GeaflowRuntimeException;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PaimonMessage implements Serializable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PaimonMessage.class);
+ private static final ThreadLocal<CommitMessageSerializer> CACHE =
ThreadLocal.withInitial(
+ CommitMessageSerializer::new);
+
+ private transient List<CommitMessage> messages;
+ private byte[] messageBytes;
+ private int serializerVersion;
+ private String tableName;
+ private long chkId;
+
+ public PaimonMessage(long chkId, String tableName, List<CommitMessage>
messages) {
+ this.chkId = chkId;
+ this.tableName = tableName;
+
+ CommitMessageSerializer serializer = CACHE.get();
+ this.serializerVersion = serializer.getVersion();
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try {
+ serializer.serializeList(messages, new
DataOutputViewStreamWrapper(out));
+ this.messageBytes = out.toByteArray();
+ LOGGER.info("ser bytes: {}", messageBytes.length);
+ } catch (Exception e) {
+ LOGGER.error("serialize message error", e);
+ throw new GeaflowRuntimeException(e);
+ }
+ }
+
+ public List<CommitMessage> getMessages() {
+ if (messages == null) {
+ CommitMessageSerializer serializer = CACHE.get();
+ try {
+ if (messageBytes == null) {
+ LOGGER.warn("deserialize message error, null");
+ }
+ ByteArrayInputStream in = new
ByteArrayInputStream(messageBytes);
+ messages = serializer.deserializeList(serializerVersion,
+ new DataInputViewStreamWrapper(in));
+ } catch (Exception e) {
+ LOGGER.error("deserialize message error", e);
+ throw new GeaflowRuntimeException(e);
+ }
+ }
+ return messages;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public long getCheckpointId() {
+ return chkId;
+ }
+}
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonConfigKeys.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonConfigKeys.java
new file mode 100644
index 00000000..1e4c80b1
--- /dev/null
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonConfigKeys.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.paimon.config;
+
+import org.apache.geaflow.common.config.ConfigKey;
+import org.apache.geaflow.common.config.ConfigKeys;
+
+public class PaimonConfigKeys {
+
+ public static final ConfigKey PAIMON_STORE_WAREHOUSE = ConfigKeys
+ .key("geaflow.store.paimon.warehouse")
+ .defaultValue("file:///tmp/paimon/")
+ .description("paimon warehouse, default LOCAL path, now support path
prefix: "
+ + "[file://], Options for future: [hdfs://, oss://, s3://]");
+
+ public static final ConfigKey PAIMON_STORE_META_STORE = ConfigKeys
+ .key("geaflow.store.paimon.meta.store")
+ .defaultValue("FILESYSTEM")
+ .description("Metastore of paimon catalog, now support [FILESYSTEM].
Options for future: "
+ + "[HIVE, JDBC].");
+
+ public static final ConfigKey PAIMON_STORE_OPTIONS = ConfigKeys
+ .key("geaflow.store.paimon.options")
+ .defaultValue(128)
+ .description("paimon memtable size, default 256MB");
+
+ public static final ConfigKey PAIMON_STORE_DATABASE = ConfigKeys
+ .key("geaflow.store.paimon.database")
+ .defaultValue("graph")
+ .description("paimon graph store database");
+
+ public static final ConfigKey PAIMON_STORE_VERTEX_TABLE = ConfigKeys
+ .key("geaflow.store.paimon.vertex.table")
+ .defaultValue("vertex")
+ .description("paimon graph store vertex table name");
+
+ public static final ConfigKey PAIMON_STORE_EDGE_TABLE = ConfigKeys
+ .key("geaflow.store.paimon.edge.table")
+ .defaultValue("edge")
+ .description("paimon graph store edge table name");
+
+ public static final ConfigKey PAIMON_STORE_INDEX_TABLE = ConfigKeys
+ .key("geaflow.store.paimon.index.table")
+ .defaultValue("index")
+ .description("paimon graph store index table name");
+
+ public static final ConfigKey PAIMON_STORE_DISTRIBUTED_MODE_ENABLE =
ConfigKeys
+ .key("geaflow.store.paimon.distributed.mode.enable")
+ .defaultValue(true)
+ .description("paimon graph store distributed mode");
+
+ public static final ConfigKey PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE =
ConfigKeys
+ .key("geaflow.store.paimon.table.auto.create")
+ .defaultValue(false)
+ .description("paimon graph store table auto create");
+
+}
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonStoreConfig.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonStoreConfig.java
new file mode 100644
index 00000000..e82da589
--- /dev/null
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonStoreConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.paimon.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.geaflow.common.config.Configuration;
+
+public class PaimonStoreConfig {
+
+ public static Map<String, String> getPaimonOptions(Configuration config) {
+ String s = config.getString(PaimonConfigKeys.PAIMON_STORE_OPTIONS,
"");
+ if (StringUtils.isEmpty(s)) {
+ return null;
+ }
+ Map<String, String> options = new HashMap<>();
+ String[] pairs = s.trim().split(",");
+ for (String pair : pairs) {
+ String[] kv = pair.trim().split("=");
+ if (kv.length < 2) {
+ continue;
+ }
+ options.put(kv[0], kv[1]);
+ }
+ return options;
+ }
+
+}
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonBaseGraphProxy.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonBaseGraphProxy.java
index 2723988a..7fd237c3 100644
---
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonBaseGraphProxy.java
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonBaseGraphProxy.java
@@ -80,14 +80,14 @@ public abstract class PaimonBaseGraphProxy<K, VV, EV>
implements IGraphPaimonPro
public void addEdge(IEdge<K, EV> edge) {
Tuple<byte[], byte[]> tuple =
this.encoder.getEdgeEncoder().format(edge);
GenericRow record = GenericRow.of(tuple.f0, tuple.f1);
- this.edgeHandle.write(record, 0);
+ this.edgeHandle.write(record);
}
@Override
public void addVertex(IVertex<K, VV> vertex) {
Tuple<byte[], byte[]> tuple =
this.encoder.getVertexEncoder().format(vertex);
GenericRow record = GenericRow.of(tuple.f0, tuple.f1);
- this.vertexHandle.write(record, 0);
+ this.vertexHandle.write(record);
}
@Override
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonGraphMultiVersionedProxy.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonGraphMultiVersionedProxy.java
index 3d91262c..0e77dc2b 100644
---
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonGraphMultiVersionedProxy.java
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonGraphMultiVersionedProxy.java
@@ -132,7 +132,7 @@ public class PaimonGraphMultiVersionedProxy<K, VV, EV>
implements
Tuple<byte[], byte[]> tuple = edgeEncoder.format(edge);
byte[] bVersion = getBinaryVersion(version);
GenericRow record = GenericRow.of(concat(bVersion, tuple.f0),
tuple.f1);
- this.edgeHandle.write(record, 0);
+ this.edgeHandle.write(record);
}
@Override
@@ -141,8 +141,8 @@ public class PaimonGraphMultiVersionedProxy<K, VV, EV>
implements
byte[] bVersion = getBinaryVersion(version);
GenericRow record = GenericRow.of(concat(bVersion, tuple.f0),
tuple.f1);
GenericRow index = GenericRow.of(concat(tuple.f0, bVersion),
EMPTY_BYTES);
- this.vertexHandle.write(record, 0);
- this.vertexIndexHandle.write(index, 0);
+ this.vertexHandle.write(record);
+ this.vertexIndexHandle.write(index);
}
@Override
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/test/java/org/apache/geaflow/store/paimon/PaimonRWHandleTest.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/test/java/org/apache/geaflow/store/paimon/PaimonRWHandleTest.java
index 4856d88c..a951c183 100644
---
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/test/java/org/apache/geaflow/store/paimon/PaimonRWHandleTest.java
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/test/java/org/apache/geaflow/store/paimon/PaimonRWHandleTest.java
@@ -97,7 +97,7 @@ public class PaimonRWHandleTest {
value.getBytes() // value
);
- edgeHandle.write(row, 0);
+ edgeHandle.write(row);
long checkpointId = 1L;
edgeHandle.commit(checkpointId);
diff --git
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonDynamicGraphStateTest.java
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonDynamicGraphStateTest.java
index 371598a9..2922d7e5 100644
---
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonDynamicGraphStateTest.java
+++
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonDynamicGraphStateTest.java
@@ -45,7 +45,7 @@ import org.apache.geaflow.state.data.OneDegreeGraph;
import org.apache.geaflow.state.descriptor.GraphStateDescriptor;
import org.apache.geaflow.state.pushdown.filter.IEdgeFilter;
import org.apache.geaflow.state.pushdown.filter.IVertexFilter;
-import org.apache.geaflow.store.paimon.PaimonConfigKeys;
+import org.apache.geaflow.store.paimon.config.PaimonConfigKeys;
import org.apache.geaflow.utils.keygroup.DefaultKeyGroupAssigner;
import org.apache.geaflow.utils.keygroup.KeyGroup;
import org.testng.Assert;
@@ -67,8 +67,9 @@ public class PaimonDynamicGraphStateTest {
config.put(FileConfigKeys.PERSISTENT_TYPE.getKey(), "LOCAL");
config.put(FileConfigKeys.ROOT.getKey(), "/tmp/geaflow/chk/");
config.put(FileConfigKeys.JSON_CONFIG.getKey(),
GsonUtil.toJson(persistConfig));
- config.put(PaimonConfigKeys.PAIMON_OPTIONS_WAREHOUSE.getKey(),
+ config.put(PaimonConfigKeys.PAIMON_STORE_WAREHOUSE.getKey(),
"file:///tmp/PaimonDynamicGraphStateTest/");
+
config.put(PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE.getKey(),
"true");
}
@AfterClass
@@ -98,13 +99,23 @@ public class PaimonDynamicGraphStateTest {
@Test
public void testWriteRead() {
- testApi(false);
+ Map<String, String> conf = new HashMap<>(config);
+ conf.put(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE.getKey(), "false");
+
conf.put(PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(),
"false");
+ testApi(conf);
}
- private void testApi(boolean async) {
- Map<String, String> conf = config;
+ @Test
+ public void testWriteRead2() {
+ Map<String, String> conf = new HashMap<>(config);
+ conf.put(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE.getKey(), "false");
+
conf.put(PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(),
"false");
+ conf.put(PaimonConfigKeys.PAIMON_STORE_DATABASE.getKey(), "graph");
+ testApi(conf);
+ }
+
+ private void testApi(Map<String, String> conf) {
conf.put(StateConfigKeys.STATE_WRITE_BUFFER_SIZE.getKey(), "100");
- conf.put(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE.getKey(),
String.valueOf(async));
GraphState<String, String, String> graphState =
getGraphState(StringType.INSTANCE, "testApi", conf);
graphState.manage().operate().setCheckpointId(1);
@@ -126,7 +137,6 @@ public class PaimonDynamicGraphStateTest {
graphState.dynamicGraph().E().add(4L, new ValueEdge<>("1", "2", "6"));
graphState.manage().operate().finish();
graphState.manage().operate().archive();
- ;
List<IEdge<String, String>> list =
graphState.dynamicGraph().E().query(1L, "1").asList();
Assert.assertEquals(list.size(), 2);
@@ -191,7 +201,9 @@ public class PaimonDynamicGraphStateTest {
@Test
public void testKeyGroup() {
- Map<String, String> conf = config;
+ Map<String, String> conf = new HashMap<>(config);
+
conf.put(PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(),
"false");
+
conf.put(PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE.getKey(),
"true");
GraphState<String, String, String> graphState =
getGraphState(StringType.INSTANCE, "testKeyGroup", conf);
graphState.manage().operate().setCheckpointId(1);
diff --git
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
index 81937688..ed1305dc 100644
---
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
+++
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java
@@ -19,13 +19,16 @@
package org.apache.geaflow.state;
+import static
org.apache.geaflow.common.config.keys.StateConfigKeys.STATE_WRITE_ASYNC_ENABLE;
+import static
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_DATABASE;
+import static
org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_WAREHOUSE;
+
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
-import org.apache.geaflow.common.config.keys.StateConfigKeys;
import org.apache.geaflow.common.type.IType;
import org.apache.geaflow.common.type.primitive.StringType;
import org.apache.geaflow.common.utils.GsonUtil;
@@ -36,7 +39,7 @@ import org.apache.geaflow.model.graph.meta.GraphMeta;
import org.apache.geaflow.model.graph.meta.GraphMetaType;
import org.apache.geaflow.model.graph.vertex.impl.ValueVertex;
import org.apache.geaflow.state.descriptor.GraphStateDescriptor;
-import org.apache.geaflow.store.paimon.PaimonConfigKeys;
+import org.apache.geaflow.store.paimon.config.PaimonConfigKeys;
import org.apache.geaflow.utils.keygroup.DefaultKeyGroupAssigner;
import org.apache.geaflow.utils.keygroup.KeyGroup;
import org.testng.Assert;
@@ -58,8 +61,11 @@ public class PaimonGraphStateTest {
config.put(FileConfigKeys.PERSISTENT_TYPE.getKey(), "LOCAL");
config.put(FileConfigKeys.ROOT.getKey(), "/tmp/geaflow/chk/");
config.put(FileConfigKeys.JSON_CONFIG.getKey(),
GsonUtil.toJson(persistConfig));
- config.put(PaimonConfigKeys.PAIMON_OPTIONS_WAREHOUSE.getKey(),
+ config.put(PaimonConfigKeys.PAIMON_STORE_WAREHOUSE.getKey(),
"file:///tmp/PaimonGraphStateTest/");
+
config.put(PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE.getKey(),
"true");
+
config.put(PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(),
"false");
+
}
@AfterClass
@@ -87,9 +93,7 @@ public class PaimonGraphStateTest {
return graphState;
}
- public void testWriteRead(boolean async) {
- Map<String, String> conf = new HashMap<>(config);
- conf.put(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE.getKey(),
Boolean.toString(async));
+ public void testWriteRead(Map<String, String> conf) {
GraphState<String, String, String> graphState =
getGraphState(StringType.INSTANCE,
"write_read", conf);
@@ -102,6 +106,7 @@ public class PaimonGraphStateTest {
graphState.staticGraph().E().add(new ValueEdge<>("1", id,
"edge_hello"));
}
// read nothing since not committed
+ boolean async =
Boolean.parseBoolean(conf.get(STATE_WRITE_ASYNC_ENABLE.getKey()));
if (!async) {
Assert.assertNull(graphState.staticGraph().V().query("1").get());
Assert.assertEquals(graphState.staticGraph().E().query("1").asList().size(), 0);
@@ -150,8 +155,24 @@ public class PaimonGraphStateTest {
@Test
public void testBothWriteMode() {
- testWriteRead(true);
- testWriteRead(false);
+ Map<String, String> conf = new HashMap<>(config);
+ conf.put(STATE_WRITE_ASYNC_ENABLE.getKey(), Boolean.TRUE.toString());
+ testWriteRead(conf);
+
+ conf.put(STATE_WRITE_ASYNC_ENABLE.getKey(), Boolean.TRUE.toString());
+ testWriteRead(conf);
+ }
+
+ @Test
+ public void testBothWriteMode2() {
+ Map<String, String> conf = new HashMap<>(config);
+ conf.put(STATE_WRITE_ASYNC_ENABLE.getKey(), Boolean.TRUE.toString());
+ conf.put(PAIMON_STORE_WAREHOUSE.getKey(), "/tmp/testBothWriteMode2");
+ conf.put(PAIMON_STORE_DATABASE.getKey(), "graph");
+ testWriteRead(conf);
+
+ conf.put(STATE_WRITE_ASYNC_ENABLE.getKey(), Boolean.TRUE.toString());
+ testWriteRead(conf);
}
}
diff --git
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonKeyStateTest.java
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonKeyStateTest.java
index 0f254149..4d640b90 100644
---
a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonKeyStateTest.java
+++
b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonKeyStateTest.java
@@ -27,7 +27,7 @@ import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
import org.apache.geaflow.file.FileConfigKeys;
import org.apache.geaflow.state.descriptor.KeyMapStateDescriptor;
-import org.apache.geaflow.store.paimon.PaimonConfigKeys;
+import org.apache.geaflow.store.paimon.config.PaimonConfigKeys;
import org.apache.geaflow.utils.keygroup.DefaultKeyGroupAssigner;
import org.apache.geaflow.utils.keygroup.KeyGroup;
import org.testng.Assert;
@@ -46,7 +46,10 @@ public class PaimonKeyStateTest {
config.put(ExecutionConfigKeys.JOB_APP_NAME.getKey(),
"PaimonKeyStateTest");
config.put(FileConfigKeys.PERSISTENT_TYPE.getKey(), "LOCAL");
config.put(FileConfigKeys.ROOT.getKey(), "/tmp/geaflow/chk/");
- config.put(PaimonConfigKeys.PAIMON_OPTIONS_WAREHOUSE.getKey(),
"file:///tmp/PaimonKeyStateTest/");
+ config.put(PaimonConfigKeys.PAIMON_STORE_WAREHOUSE.getKey(),
"file:///tmp"
+ + "/PaimonKeyStateTest/");
+
config.put(PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(),
"false");
+
config.put(PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE.getKey(),
"true");
}
@AfterClass
@@ -72,7 +75,7 @@ public class PaimonKeyStateTest {
Assert.assertEquals(mapState.get("hello").size(), 0);
// commit chk = 1, now be able to read data.
mapState.manage().operate().archive();
- Assert.assertEquals(mapState.get("hello").size(), 4);
+ Assert.assertEquals(mapState.get("hello").size(), 6);
// set chk = 2
mapState.manage().operate().setCheckpointId(2L);
@@ -81,7 +84,7 @@ public class PaimonKeyStateTest {
conf2.put("conf2", "test");
mapState.put("hello2", conf2);
// cannot read data with chk = 2 since chk2 not committed.
- Assert.assertEquals(mapState.get("hello").size(), 4);
+ Assert.assertEquals(mapState.get("hello").size(), 6);
Assert.assertEquals(mapState.get("hello2").size(), 0);
// commit chk = 2
@@ -89,8 +92,8 @@ public class PaimonKeyStateTest {
mapState.manage().operate().archive();
// now be able to read data
- Assert.assertEquals(mapState.get("hello").size(), 4);
- Assert.assertEquals(mapState.get("hello2").size(), 5);
+ Assert.assertEquals(mapState.get("hello").size(), 6);
+ Assert.assertEquals(mapState.get("hello2").size(), 7);
// read data which not exists
Assert.assertEquals(mapState.get("hello3").size(), 0);
diff --git a/geaflow/pom.xml b/geaflow/pom.xml
index 029b43fb..652f8d88 100644
--- a/geaflow/pom.xml
+++ b/geaflow/pom.xml
@@ -50,7 +50,7 @@
<fastjson.version>1.2.71_noneautotype</fastjson.version>
<hikaricp.version>4.0.3</hikaricp.version>
<sqlite.version>3.40.0.0</sqlite.version>
- <zstd.version>1.4.3-1</zstd.version>
+ <zstd.version>1.5.5-11</zstd.version>
<rocksdb.version>7.7.3</rocksdb.version>
<paimon.version>1.0.1</paimon.version>
<snappy.version>1.1.8.4</snappy.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]