This is an automated email from the ASF dual-hosted git repository. qingwzhao pushed a commit to branch paimon_store_distributed in repository https://gitbox.apache.org/repos/asf/geaflow.git
commit c5f2bfd4c0a227b6ebb0af668d76a98c25ff36fe Author: qingwen.zqw <[email protected]> AuthorDate: Tue Sep 16 17:13:59 2025 +0800 feat: support paimon store in distributed mode --- .../java/org/apache/geaflow/view/IViewDesc.java | 4 +- .../graph/materialize/GraphViewMaterializeOp.java | 33 ++++- .../impl/graph/materialize/PaimonGlobalSink.java | 142 +++++++++++++++++++ .../view/materialize/MaterializedIncGraph.java | 16 ++- .../pdata/stream/window/WindowStreamSink.java | 3 +- .../apache/geaflow/plan/PipelinePlanBuilder.java | 153 +++++++++++++++------ .../org/apache/geaflow/plan/PipelinePlanTest.java | 32 +++-- .../geaflow/dsl/runtime/query/GQLInsertTest.java | 12 ++ .../resources/expect/gql_insert_and_graph_006.txt | 0 .../resources/query/gql_insert_and_graph_006.sql | 70 ++++++++++ .../geaflow/store/paimon/BasePaimonGraphStore.java | 54 +++++++- .../geaflow/store/paimon/BasePaimonStore.java | 43 ++++-- .../store/paimon/DynamicGraphPaimonStoreBase.java | 15 +- .../apache/geaflow/store/paimon/KVPaimonStore.java | 4 +- ...CatalogClient.java => PaimonCatalogClient.java} | 46 ++----- .../geaflow/store/paimon/PaimonCatalogManager.java | 71 ++++++++++ .../geaflow/store/paimon/PaimonConfigKeys.java | 43 ------ .../geaflow/store/paimon/PaimonTableRWHandle.java | 70 ++++++++-- .../store/paimon/StaticGraphPaimonStoreBase.java | 11 +- .../store/paimon/commit/PaimonCommitRegistry.java | 80 +++++++++++ .../geaflow/store/paimon/commit/PaimonMessage.java | 89 ++++++++++++ .../store/paimon/config/PaimonConfigKeys.java | 74 ++++++++++ .../store/paimon/config/PaimonStoreConfig.java | 46 +++++++ .../store/paimon/proxy/PaimonBaseGraphProxy.java | 4 +- .../proxy/PaimonGraphMultiVersionedProxy.java | 6 +- .../geaflow/store/paimon/PaimonRWHandleTest.java | 2 +- .../geaflow/state/PaimonDynamicGraphStateTest.java | 26 ++-- .../apache/geaflow/state/PaimonGraphStateTest.java | 37 +++-- .../apache/geaflow/state/PaimonKeyStateTest.java | 14 +- geaflow/pom.xml | 2 +- 30 files changed, 985 insertions(+), 217 deletions(-) diff --git a/geaflow/geaflow-core/geaflow-api/src/main/java/org/apache/geaflow/view/IViewDesc.java b/geaflow/geaflow-core/geaflow-api/src/main/java/org/apache/geaflow/view/IViewDesc.java index 272d896d..e2df3bbe 100644 --- a/geaflow/geaflow-core/geaflow-api/src/main/java/org/apache/geaflow/view/IViewDesc.java +++ b/geaflow/geaflow-core/geaflow-api/src/main/java/org/apache/geaflow/view/IViewDesc.java @@ -58,12 +58,14 @@ public interface IViewDesc extends Serializable { } enum BackendType { - // Default view backend, current is pangu. + // Default view backend, current is cstore. Native, // RocksDB backend. RocksDB, // Memory backend. Memory, + // Paimon backend. + Paimon, // Custom backend. Custom; diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/GraphViewMaterializeOp.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/GraphViewMaterializeOp.java index fa4d6aef..46b44048 100644 --- a/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/GraphViewMaterializeOp.java +++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/GraphViewMaterializeOp.java @@ -24,6 +24,7 @@ import static org.apache.geaflow.operator.Constants.GRAPH_VERSION; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.List; import org.apache.geaflow.api.graph.materialize.GraphMaterializeFunction; import org.apache.geaflow.api.trait.CheckpointTrait; import org.apache.geaflow.api.trait.TransactionTrait; @@ -36,12 +37,17 @@ import org.apache.geaflow.state.DataModel; import org.apache.geaflow.state.GraphState; import org.apache.geaflow.state.StateFactory; import org.apache.geaflow.state.descriptor.GraphStateDescriptor; +import org.apache.geaflow.store.paimon.commit.PaimonCommitRegistry; +import org.apache.geaflow.store.paimon.commit.PaimonCommitRegistry.TaskCommitMessage; +import org.apache.geaflow.store.paimon.commit.PaimonMessage; import org.apache.geaflow.utils.keygroup.IKeyGroupAssigner; import org.apache.geaflow.utils.keygroup.KeyGroup; import org.apache.geaflow.utils.keygroup.KeyGroupAssignerFactory; import org.apache.geaflow.utils.keygroup.KeyGroupAssignment; +import org.apache.geaflow.view.IViewDesc.BackendType; import org.apache.geaflow.view.graph.GraphViewDesc; import org.apache.geaflow.view.meta.ViewMetaBookKeeper; +import org.apache.paimon.table.sink.CommitMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +78,8 @@ public class GraphViewMaterializeOp<K, VV, EV> extends AbstractOneInputOperator< int maxPara = graphViewDesc.getShardNum(); int taskPara = runtimeContext.getTaskArgs().getParallelism(); Preconditions.checkArgument(taskPara <= maxPara, - String.format("task parallelism '%s' must be <= shard num(max parallelism) '%s'", taskPara, maxPara)); + String.format("task parallelism '%s' must be <= shard num(max parallelism) '%s'", + taskPara, maxPara)); int taskIndex = runtimeContext.getTaskArgs().getTaskIndex(); KeyGroup keyGroup = KeyGroupAssignment.computeKeyGroupRangeForOperatorIndex(maxPara, @@ -85,7 +92,8 @@ public class GraphViewMaterializeOp<K, VV, EV> extends AbstractOneInputOperator< int taskId = runtimeContext.getTaskArgs().getTaskId(); LOGGER.info("opName:{} taskId:{} taskIndex:{} keyGroup:{}", name, taskId, taskIndex, keyGroup); - this.graphState = StateFactory.buildGraphState(descriptor, runtimeContext.getConfiguration()); + this.graphState = StateFactory.buildGraphState(descriptor, + runtimeContext.getConfiguration()); recover(); this.function = new DynamicGraphMaterializeFunction<>(graphState); } @@ -105,12 +113,28 @@ public class GraphViewMaterializeOp<K, VV, EV> extends AbstractOneInputOperator< this.graphState.manage().operate().setCheckpointId(checkpointId); this.graphState.manage().operate().finish(); this.graphState.manage().operate().archive(); + + if (graphViewDesc.getBackend() == BackendType.Paimon) { + int taskIndex = runtimeContext.getTaskArgs().getTaskIndex(); + + PaimonCommitRegistry registry = PaimonCommitRegistry.getInstance(); + List<TaskCommitMessage> messages = registry.pollMessages(taskIndex); + if (messages != null && !messages.isEmpty()) { + for (TaskCommitMessage message : messages) { + List<CommitMessage> msg = message.getMessages(); + LOGGER.info("task {} emits windowId:{} chkId:{} table:{} messages:{}", + taskIndex, windowId, checkpointId, message.getTableName(), + msg.size()); + collectValue(new PaimonMessage(checkpointId, message.getTableName(), + msg)); + } + } + } LOGGER.info("do checkpoint over, checkpointId: {}", checkpointId); } @Override public void finish(long windowId) { - } @Override @@ -159,7 +183,8 @@ public class GraphViewMaterializeOp<K, VV, EV> extends AbstractOneInputOperator< } } - public static class DynamicGraphMaterializeFunction<K, VV, EV> implements GraphMaterializeFunction<K, VV, EV> { + public static class DynamicGraphMaterializeFunction<K, VV, EV> implements + GraphMaterializeFunction<K, VV, EV> { private final GraphState<K, VV, EV> graphState; diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/PaimonGlobalSink.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/PaimonGlobalSink.java new file mode 100644 index 00000000..51c56df1 --- /dev/null +++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/main/java/org/apache/geaflow/operator/impl/graph/materialize/PaimonGlobalSink.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.operator.impl.graph.materialize; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.geaflow.api.context.RuntimeContext; +import org.apache.geaflow.api.function.RichWindowFunction; +import org.apache.geaflow.api.function.io.SinkFunction; +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.common.config.keys.ExecutionConfigKeys; +import org.apache.geaflow.common.exception.GeaflowRuntimeException; +import org.apache.geaflow.store.paimon.PaimonCatalogClient; +import org.apache.geaflow.store.paimon.PaimonCatalogManager; +import org.apache.geaflow.store.paimon.commit.PaimonMessage; +import org.apache.geaflow.store.paimon.config.PaimonConfigKeys; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.StreamTableCommit; +import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PaimonGlobalSink extends RichWindowFunction implements SinkFunction<PaimonMessage> { + + private static final Logger LOGGER = LoggerFactory.getLogger(PaimonGlobalSink.class); + + private String dbName; + private String jobName; + private long windowId; + + private List<PaimonMessage> paimonMessages; + private Map<String, StreamWriteBuilder> writeBuilders; + private PaimonCatalogClient client; + + @Override + public void open(RuntimeContext runtimeContext) { + this.windowId = runtimeContext.getWindowId(); + Configuration jobConfig = runtimeContext.getConfiguration(); + this.client = PaimonCatalogManager.getCatalogClient(jobConfig); + this.jobName = jobConfig.getString(ExecutionConfigKeys.JOB_APP_NAME); + this.dbName = jobConfig.getString(PaimonConfigKeys.PAIMON_STORE_DATABASE); + + this.writeBuilders = new HashMap<>(); + this.paimonMessages = new ArrayList<>(); + LOGGER.info("init paimon sink with db {}", dbName); + } + + @Override + public void write(PaimonMessage message) throws Exception { + paimonMessages.add(message); + } + + @Override + public void finish() { + int messageSize = paimonMessages.size(); + if (messageSize == 0) { + LOGGER.info("commit windowId{} empty messages", windowId); + return; + } + + final long startTime = System.currentTimeMillis(); + final long checkpointId = paimonMessages.get(0).getCheckpointId(); + + Map<String, List<CommitMessage>> tableMessages = new HashMap<>(); + for (PaimonMessage message : paimonMessages) { + List<CommitMessage> messages = message.getMessages(); + if (messages != null && !messages.isEmpty()) { + String tableName = message.getTableName(); + List<CommitMessage> commitMessages = tableMessages.computeIfAbsent(tableName, + k -> new ArrayList<>()); + commitMessages.addAll(messages); + } + } + long deserializeTime = System.currentTimeMillis() - startTime; + if (!tableMessages.isEmpty()) { + + for (Map.Entry<String, List<CommitMessage>> entry : tableMessages.entrySet()) { + LOGGER.info("commit checkpointId: {} table: {} messages: {}", + checkpointId, entry.getKey(), entry.getValue().size()); + StreamWriteBuilder writeBuilder = getWriteBuilder(entry.getKey()); + try (StreamTableCommit commit = writeBuilder.newCommit()) { + try { + commit.commit(checkpointId, entry.getValue()); + } catch (Throwable e) { + LOGGER.warn("commit failed: {}", e.getMessage(), e); + Map<Long, List<CommitMessage>> commitIdAndMessages = new HashMap<>(); + commitIdAndMessages.put(checkpointId, entry.getValue()); + commit.filterAndCommit(commitIdAndMessages); + } + } catch (Throwable e) { + LOGGER.error("Failed to commit data into Paimon: {}", e.getMessage(), e); + throw new GeaflowRuntimeException("Failed to commit data into Paimon.", e); + } + paimonMessages.clear(); + } + } + LOGGER.info("commit batchId: {} messages: {} deserialize time: {} ms", checkpointId, + messageSize, deserializeTime); + } + + private StreamWriteBuilder getWriteBuilder(String tableName) { + return writeBuilders.computeIfAbsent(tableName, k -> { + try { + FileStoreTable table = (FileStoreTable) client.getTable(Identifier.create(dbName, + tableName)); + return table.newStreamWriteBuilder().withCommitUser(jobName); + } catch (Throwable e) { + throw new GeaflowRuntimeException(e); + } + }); + } + + @Override + public void close() { + LOGGER.info("close sink"); + if (client != null) { + client.close(); + } + } + +} diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/graph/view/materialize/MaterializedIncGraph.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/graph/view/materialize/MaterializedIncGraph.java index a8beef88..45bfd98a 100644 --- a/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/graph/view/materialize/MaterializedIncGraph.java +++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/graph/view/materialize/MaterializedIncGraph.java @@ -27,14 +27,19 @@ import org.apache.geaflow.model.graph.edge.IEdge; import org.apache.geaflow.model.graph.vertex.IVertex; import org.apache.geaflow.operator.base.AbstractOperator; import org.apache.geaflow.operator.impl.graph.materialize.GraphViewMaterializeOp; +import org.apache.geaflow.operator.impl.graph.materialize.PaimonGlobalSink; +import org.apache.geaflow.operator.impl.window.SinkOperator; import org.apache.geaflow.partitioner.IPartitioner; import org.apache.geaflow.partitioner.impl.KeyPartitioner; import org.apache.geaflow.pdata.graph.view.AbstractGraphView; import org.apache.geaflow.pdata.graph.window.WindowStreamGraph; import org.apache.geaflow.pdata.stream.Stream; import org.apache.geaflow.pdata.stream.TransformType; +import org.apache.geaflow.pdata.stream.window.WindowStreamSink; import org.apache.geaflow.pipeline.context.IPipelineContext; +import org.apache.geaflow.store.paimon.commit.PaimonMessage; import org.apache.geaflow.view.IViewDesc; +import org.apache.geaflow.view.IViewDesc.BackendType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +72,16 @@ public class MaterializedIncGraph<K, VV, EV> extends AbstractGraphView<K, VV, EV + "edgeStream parallelism must <= number of graph shard num"; this.edgeStream = this.edgeStream .keyBy(new WindowStreamGraph.DefaultEdgePartition<>()); - super.context.addPAction(this); + + if (graphViewDesc.getBackend() == BackendType.Paimon) { + SinkOperator<PaimonMessage> operator = + new SinkOperator<>(new PaimonGlobalSink()); + WindowStreamSink globalSink = new WindowStreamSink<>(this, operator); + globalSink.withParallelism(1); + super.context.addPAction(globalSink); + } else { + super.context.addPAction(this); + } assert this.getParallelism() == graphViewDesc.getShardNum() : "Materialize parallelism " + "must be equal to the graph shard num."; diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/stream/window/WindowStreamSink.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/stream/window/WindowStreamSink.java index 4dc6de78..9c346013 100644 --- a/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/stream/window/WindowStreamSink.java +++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-pdata/src/main/java/org/apache/geaflow/pdata/stream/window/WindowStreamSink.java @@ -35,8 +35,7 @@ public class WindowStreamSink<T> extends Stream<T> implements PStreamSink<T> { super(pipelineContext, operator); } - public WindowStreamSink(Stream stream, - AbstractOperator operator) { + public WindowStreamSink(Stream stream, AbstractOperator operator) { super(stream, operator); } diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/main/java/org/apache/geaflow/plan/PipelinePlanBuilder.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/main/java/org/apache/geaflow/plan/PipelinePlanBuilder.java index e9240115..8ab196b4 100644 --- a/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/main/java/org/apache/geaflow/plan/PipelinePlanBuilder.java +++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/main/java/org/apache/geaflow/plan/PipelinePlanBuilder.java @@ -102,7 +102,8 @@ public class PipelinePlanBuilder implements Serializable { }); boolean isSingleWindow = this.pipelineGraph.getSourceVertices().stream().allMatch(v -> - ((AbstractOperator) v.getOperator()).getOpArgs().getOpType() == OpArgs.OpType.SINGLE_WINDOW_SOURCE); + ((AbstractOperator) v.getOperator()).getOpArgs().getOpType() + == OpArgs.OpType.SINGLE_WINDOW_SOURCE); if (isSingleWindow) { pipelineContext.getConfig().put(BATCH_NUMBER_PER_CHECKPOINT.getKey(), String.valueOf(SINGLE_WINDOW_CHECKPOINT_DURATION)); @@ -143,7 +144,8 @@ public class PipelinePlanBuilder implements Serializable { if (action instanceof PGraphMaterialize) { visitMaterializeAction((PGraphMaterialize) action); } else { - PipelineVertex pipelineVertex = new PipelineVertex(vId, stream.getOperator(), stream.getParallelism()); + PipelineVertex pipelineVertex = new PipelineVertex(vId, stream.getOperator(), + stream.getParallelism()); if (action instanceof PStreamSink) { pipelineVertex.setType(VertexType.sink); pipelineVertex.setVertexMode(VertexMode.append); @@ -178,11 +180,13 @@ public class PipelinePlanBuilder implements Serializable { Preconditions.checkArgument(vertexStreamInput != null && edgeStreamInput != null, "input vertex and edge stream must be not null"); - PipelineEdge vertexInputEdge = new PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(), + PipelineEdge vertexInputEdge = new PipelineEdge(this.edgeIdGenerator++, + vertexStreamInput.getId(), stream.getId(), vertexStreamInput.getPartition(), vertexStreamInput.getEncoder()); vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name()); this.pipelineGraph.addEdge(vertexInputEdge); - PipelineEdge edgeInputEdge = new PipelineEdge(this.edgeIdGenerator++, edgeStreamInput.getId(), + PipelineEdge edgeInputEdge = new PipelineEdge(this.edgeIdGenerator++, + edgeStreamInput.getId(), stream.getId(), edgeStreamInput.getPartition(), edgeStreamInput.getEncoder()); edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name()); this.pipelineGraph.addEdge(edgeInputEdge); @@ -207,6 +211,30 @@ public class PipelinePlanBuilder implements Serializable { pipelineVertex.setAffinity(AffinityLevel.worker); break; } + case ContinueGraphMaterialize: + pipelineVertex.setType(VertexType.inc_process); + pipelineVertex.setAffinity(AffinityLevel.worker); + MaterializedIncGraph pGraphMaterialize = (MaterializedIncGraph) stream; + + Stream vertexInput = pGraphMaterialize.getInput(); + Stream edgeInput = (Stream) pGraphMaterialize.getEdges(); + Preconditions.checkArgument(vertexInput != null && edgeInput != null, + "input vertex and edge stream must be not null"); + + PipelineEdge vertexEdge = new PipelineEdge(this.edgeIdGenerator++, + vertexInput.getId(), + stream.getId(), vertexInput.getPartition(), vertexInput.getEncoder()); + vertexEdge.setEdgeName(GraphRecordNames.Vertex.name()); + this.pipelineGraph.addEdge(vertexEdge); + PipelineEdge edgeEdge = new PipelineEdge(this.edgeIdGenerator++, + edgeInput.getId(), + stream.getId(), edgeInput.getPartition(), edgeInput.getEncoder()); + edgeEdge.setEdgeName(GraphRecordNames.Edge.name()); + this.pipelineGraph.addEdge(edgeEdge); + + visitNode(vertexInput); + visitNode(edgeInput); + break; case ContinueGraphCompute: { pipelineVertex.setType(VertexType.inc_iterator); pipelineVertex.setAffinity(AffinityLevel.worker); @@ -217,26 +245,33 @@ public class PipelinePlanBuilder implements Serializable { pipelineVertex.setType(VertexType.inc_vertex_centric); break; default: - throw new GeaflowRuntimeException("not support graph compute type, " + computeType); + throw new GeaflowRuntimeException( + "not support graph compute type, " + computeType); } pipelineVertex.setIterations(pGraphCompute.getMaxIterations()); Stream vertexStreamInput = pGraphCompute.getInput(); Stream edgeStreamInput = (Stream) pGraphCompute.getEdges(); - Preconditions.checkArgument(vertexStreamInput != null && edgeStreamInput != null, + Preconditions.checkArgument( + vertexStreamInput != null && edgeStreamInput != null, "input vertex and edge stream must be not null"); - PipelineEdge vertexInputEdge = new PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(), - stream.getId(), vertexStreamInput.getPartition(), vertexStreamInput.getEncoder()); + PipelineEdge vertexInputEdge = new PipelineEdge(this.edgeIdGenerator++, + vertexStreamInput.getId(), + stream.getId(), vertexStreamInput.getPartition(), + vertexStreamInput.getEncoder()); vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name()); this.pipelineGraph.addEdge(vertexInputEdge); - PipelineEdge edgeInputEdge = new PipelineEdge(this.edgeIdGenerator++, edgeStreamInput.getId(), - stream.getId(), edgeStreamInput.getPartition(), edgeStreamInput.getEncoder()); + PipelineEdge edgeInputEdge = new PipelineEdge(this.edgeIdGenerator++, + edgeStreamInput.getId(), + stream.getId(), edgeStreamInput.getPartition(), + edgeStreamInput.getEncoder()); edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name()); this.pipelineGraph.addEdge(edgeInputEdge); // iteration loop edge - PipelineEdge iterationEdge = buildIterationEdge(vId, pGraphCompute.getMsgEncoder()); + PipelineEdge iterationEdge = buildIterationEdge(vId, + pGraphCompute.getMsgEncoder()); this.pipelineGraph.addEdge(iterationEdge); buildIterationAggVertexAndEdge(pipelineVertex); @@ -255,26 +290,33 @@ public class PipelinePlanBuilder implements Serializable { pipelineVertex.setType(VertexType.vertex_centric); break; default: - throw new GeaflowRuntimeException("not support graph compute type, " + computeType); + throw new GeaflowRuntimeException( + "not support graph compute type, " + computeType); } pipelineVertex.setIterations(pGraphCompute.getMaxIterations()); Stream vertexStreamInput = pGraphCompute.getInput(); Stream edgeStreamInput = (Stream) pGraphCompute.getEdges(); - Preconditions.checkArgument(vertexStreamInput != null && edgeStreamInput != null, + Preconditions.checkArgument( + vertexStreamInput != null && edgeStreamInput != null, "input vertex and edge stream must be not null"); - PipelineEdge vertexInputEdge = new PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(), - stream.getId(), vertexStreamInput.getPartition(), vertexStreamInput.getEncoder()); + PipelineEdge vertexInputEdge = new PipelineEdge(this.edgeIdGenerator++, + vertexStreamInput.getId(), + stream.getId(), vertexStreamInput.getPartition(), + vertexStreamInput.getEncoder()); vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name()); this.pipelineGraph.addEdge(vertexInputEdge); - PipelineEdge edgeInputEdge = new PipelineEdge(this.edgeIdGenerator++, edgeStreamInput.getId(), - stream.getId(), edgeStreamInput.getPartition(), edgeStreamInput.getEncoder()); + PipelineEdge edgeInputEdge = new PipelineEdge(this.edgeIdGenerator++, + edgeStreamInput.getId(), + stream.getId(), edgeStreamInput.getPartition(), + edgeStreamInput.getEncoder()); edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name()); this.pipelineGraph.addEdge(edgeInputEdge); // iteration loop edge - PipelineEdge iterationEdge = buildIterationEdge(vId, pGraphCompute.getMsgEncoder()); + PipelineEdge iterationEdge = buildIterationEdge(vId, + pGraphCompute.getMsgEncoder()); this.pipelineGraph.addEdge(iterationEdge); buildIterationAggVertexAndEdge(pipelineVertex); @@ -293,28 +335,35 @@ public class PipelinePlanBuilder implements Serializable { pipelineVertex.setType(VertexType.vertex_centric); break; default: - throw new GeaflowRuntimeException("not support graph traversal type, " + traversalType); + throw new GeaflowRuntimeException( + "not support graph traversal type, " + traversalType); } pipelineVertex.setIterations(windowGraph.getMaxIterations()); Stream vertexStreamInput = windowGraph.getInput(); Stream edgeStreamInput = (Stream) windowGraph.getEdges(); - Preconditions.checkArgument(vertexStreamInput != null && edgeStreamInput != null, + Preconditions.checkArgument( + vertexStreamInput != null && edgeStreamInput != null, "input vertex and edge stream must be not null"); - PipelineEdge vertexInputEdge = new PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(), - stream.getId(), vertexStreamInput.getPartition(), vertexStreamInput.getEncoder()); + PipelineEdge vertexInputEdge = new PipelineEdge(this.edgeIdGenerator++, + vertexStreamInput.getId(), + stream.getId(), vertexStreamInput.getPartition(), + vertexStreamInput.getEncoder()); vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name()); this.pipelineGraph.addEdge(vertexInputEdge); - PipelineEdge edgeInputEdge = new PipelineEdge(this.edgeIdGenerator++, edgeStreamInput.getId(), - stream.getId(), edgeStreamInput.getPartition(), edgeStreamInput.getEncoder()); + PipelineEdge edgeInputEdge = new PipelineEdge(this.edgeIdGenerator++, + edgeStreamInput.getId(), + stream.getId(), edgeStreamInput.getPartition(), + edgeStreamInput.getEncoder()); edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name()); this.pipelineGraph.addEdge(edgeInputEdge); // Add request input. if (windowGraph.getRequestStream() != null) { Stream requestStreamInput = (Stream) windowGraph.getRequestStream(); - PipelineEdge requestInputEdge = new PipelineEdge(this.edgeIdGenerator++, requestStreamInput.getId(), + PipelineEdge requestInputEdge = new PipelineEdge(this.edgeIdGenerator++, + requestStreamInput.getId(), stream.getId(), requestStreamInput.getPartition(), requestStreamInput.getEncoder()); requestInputEdge.setEdgeName(GraphRecordNames.Request.name()); @@ -323,7 +372,8 @@ public class PipelinePlanBuilder implements Serializable { } // iteration loop edge - PipelineEdge iterationEdge = buildIterationEdge(vId, windowGraph.getMsgEncoder()); + PipelineEdge iterationEdge = buildIterationEdge(vId, + windowGraph.getMsgEncoder()); this.pipelineGraph.addEdge(iterationEdge); buildIterationAggVertexAndEdge(pipelineVertex); @@ -342,31 +392,38 @@ public class PipelinePlanBuilder implements Serializable { pipelineVertex.setType(VertexType.inc_vertex_centric); break; default: - throw new GeaflowRuntimeException("not support graph traversal type, " + traversalType); + throw new GeaflowRuntimeException( + "not support graph traversal type, " + traversalType); } pipelineVertex.setIterations(windowGraph.getMaxIterations()); Stream vertexStreamInput = windowGraph.getInput(); Stream edgeStreamInput = (Stream) windowGraph.getEdges(); - Preconditions.checkArgument(vertexStreamInput != null && edgeStreamInput != null, + Preconditions.checkArgument( + vertexStreamInput != null && edgeStreamInput != null, "input vertex and edge stream must be not null"); // Add vertex input. - PipelineEdge vertexInputEdge = new PipelineEdge(this.edgeIdGenerator++, vertexStreamInput.getId(), - stream.getId(), vertexStreamInput.getPartition(), vertexStreamInput.getEncoder()); + PipelineEdge vertexInputEdge = new PipelineEdge(this.edgeIdGenerator++, + vertexStreamInput.getId(), + stream.getId(), vertexStreamInput.getPartition(), + vertexStreamInput.getEncoder()); vertexInputEdge.setEdgeName(GraphRecordNames.Vertex.name()); this.pipelineGraph.addEdge(vertexInputEdge); // Add edge input. - PipelineEdge edgeInputEdge = new PipelineEdge(this.edgeIdGenerator++, edgeStreamInput.getId(), - stream.getId(), edgeStreamInput.getPartition(), edgeStreamInput.getEncoder()); + PipelineEdge edgeInputEdge = new PipelineEdge(this.edgeIdGenerator++, + edgeStreamInput.getId(), + stream.getId(), edgeStreamInput.getPartition(), + edgeStreamInput.getEncoder()); edgeInputEdge.setEdgeName(GraphRecordNames.Edge.name()); this.pipelineGraph.addEdge(edgeInputEdge); // Add request input. if (windowGraph.getRequestStream() != null) { Stream requestStreamInput = (Stream) windowGraph.getRequestStream(); - PipelineEdge requestInputEdge = new PipelineEdge(this.edgeIdGenerator++, requestStreamInput.getId(), + PipelineEdge requestInputEdge = new PipelineEdge(this.edgeIdGenerator++, + requestStreamInput.getId(), stream.getId(), requestStreamInput.getPartition(), requestStreamInput.getEncoder()); requestInputEdge.setEdgeName(GraphRecordNames.Request.name()); @@ -375,7 +432,8 @@ public class PipelinePlanBuilder implements Serializable { } // Add iteration loop edge - PipelineEdge iterationEdge = buildIterationEdge(vId, windowGraph.getMsgEncoder()); + PipelineEdge iterationEdge = buildIterationEdge(vId, + windowGraph.getMsgEncoder()); this.pipelineGraph.addEdge(iterationEdge); buildIterationAggVertexAndEdge(pipelineVertex); @@ -387,9 +445,11 @@ public class PipelinePlanBuilder implements Serializable { case StreamTransform: { pipelineVertex.setType(VertexType.process); Stream inputStream = stream.getInput(); - Preconditions.checkArgument(inputStream != null, "input stream must be not null"); + Preconditions.checkArgument(inputStream != null, + "input stream must be not null"); - PipelineEdge pipelineEdge = new PipelineEdge(this.edgeIdGenerator++, inputStream.getId(), stream.getId(), + PipelineEdge pipelineEdge = new PipelineEdge(this.edgeIdGenerator++, + inputStream.getId(), stream.getId(), inputStream.getPartition(), inputStream.getEncoder()); this.pipelineGraph.addEdge(pipelineEdge); @@ -400,9 +460,11 @@ public class PipelinePlanBuilder implements Serializable { pipelineVertex.setType(VertexType.inc_process); pipelineVertex.setAffinity(AffinityLevel.worker); Stream inputStream = stream.getInput(); - Preconditions.checkArgument(inputStream != null, "input stream must be not null"); + Preconditions.checkArgument(inputStream != null, + "input stream must be not null"); - PipelineEdge pipelineEdge = new PipelineEdge(this.edgeIdGenerator++, inputStream.getId(), stream.getId(), + PipelineEdge pipelineEdge = new PipelineEdge(this.edgeIdGenerator++, + inputStream.getId(), stream.getId(), inputStream.getPartition(), inputStream.getEncoder()); this.pipelineGraph.addEdge(pipelineEdge); @@ -414,7 +476,8 @@ public class PipelinePlanBuilder implements Serializable { WindowUnionStream unionStream = (WindowUnionStream) stream; Stream mainInput = stream.getInput(); - PipelineEdge mainEdge = new PipelineEdge(this.edgeIdGenerator++, mainInput.getId(), unionStream.getId(), + PipelineEdge mainEdge = new PipelineEdge(this.edgeIdGenerator++, + mainInput.getId(), unionStream.getId(), mainInput.getPartition(), unionStream.getEncoder()); mainEdge.setStreamOrdinal(0); this.pipelineGraph.addEdge(mainEdge); @@ -423,8 +486,10 @@ public class PipelinePlanBuilder implements Serializable { List<Stream> otherInputs = unionStream.getUnionWindowDataStreamList(); for (int index = 0; index < otherInputs.size(); index++) { Stream otherInput = otherInputs.get(index); - PipelineEdge rightEdge = new PipelineEdge(this.edgeIdGenerator++, otherInput.getId(), - unionStream.getId(), otherInput.getPartition(), otherInput.getEncoder()); + PipelineEdge rightEdge = new PipelineEdge(this.edgeIdGenerator++, + otherInput.getId(), + unionStream.getId(), otherInput.getPartition(), + otherInput.getEncoder()); rightEdge.setStreamOrdinal(index + 1); this.pipelineGraph.addEdge(rightEdge); visitNode(otherInput); @@ -432,7 +497,8 @@ public class PipelinePlanBuilder implements Serializable { break; } default: - throw new GeaflowRuntimeException("Not supported transform type: " + stream.getTransformType()); + throw new GeaflowRuntimeException( + "Not supported transform type: " + stream.getTransformType()); } this.pipelineGraph.addVertex(pipelineVertex); } @@ -474,7 +540,8 @@ public class PipelinePlanBuilder implements Serializable { // Union Optimization. boolean isExtraOptimizeSink = pipelineConfig.getBoolean(ENABLE_EXTRA_OPTIMIZE_SINK); new UnionOptimizer(isExtraOptimizeSink).optimizePlan(pipelineGraph); - LOGGER.info("union optimize: {}", new PlanGraphVisualization(pipelineGraph).getGraphviz()); + LOGGER.info("union optimize: {}", + new PlanGraphVisualization(pipelineGraph).getGraphviz()); } new PipelineGraphOptimizer().optimizePipelineGraph(pipelineGraph); } diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/test/java/org/apache/geaflow/plan/PipelinePlanTest.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/test/java/org/apache/geaflow/plan/PipelinePlanTest.java index 5926b0ca..26ff4f35 100644 --- a/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/test/java/org/apache/geaflow/plan/PipelinePlanTest.java +++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-plan/src/test/java/org/apache/geaflow/plan/PipelinePlanTest.java @@ -60,6 +60,7 @@ import org.apache.geaflow.plan.graph.PipelineVertex; import org.apache.geaflow.plan.optimizer.PipelineGraphOptimizer; import org.apache.geaflow.view.GraphViewBuilder; import org.apache.geaflow.view.IViewDesc; +import org.apache.geaflow.view.IViewDesc.BackendType; import org.apache.geaflow.view.graph.GraphViewDesc; import org.apache.geaflow.view.graph.PGraphView; import org.apache.geaflow.view.graph.PIncGraphView; @@ -186,14 +187,15 @@ public class PipelinePlanTest extends BasePlanTest { Assert.assertEquals(vertexMap.size(), 8); } - @Test - public void testMaterialize() { - AtomicInteger idGenerator = new AtomicInteger(0); - AbstractPipelineContext context = mock(AbstractPipelineContext.class); - when(context.generateId()).then(invocation -> idGenerator.incrementAndGet()); + public void testMaterialize(IViewDesc.BackendType backendType) { Configuration configuration = new Configuration(); configuration.put(FrameworkConfigKeys.INC_STREAM_MATERIALIZE_DISABLE, Boolean.TRUE.toString()); - when(context.getConfig()).thenReturn(configuration); + AbstractPipelineContext context = new AbstractPipelineContext(configuration) { + @Override + public int generateId() { + return idGenerator.incrementAndGet(); + } + }; PWindowSource<Integer> source1 = new WindowStreamSource<>(context, new CollectionSource<>(new ArrayList<>()), SizeTumblingWindow.of(10)); @@ -208,7 +210,7 @@ public class PipelinePlanTest extends BasePlanTest { final String graphName = "graph_view_name"; GraphViewDesc graphViewDesc = GraphViewBuilder.createGraphView(graphName) .withShardNum(4) - .withBackend(IViewDesc.BackendType.RocksDB) + .withBackend(backendType) .withSchema(new GraphMetaType(IntegerType.INSTANCE, ValueVertex.class, Integer.class, ValueEdge.class, IntegerType.class)) .build(); @@ -219,7 +221,6 @@ public class PipelinePlanTest extends BasePlanTest { vertices.window(WindowFactory.createSizeTumblingWindow(1)), edges.window(WindowFactory.createSizeTumblingWindow(1))); incGraphView.materialize(); - when(context.getActions()).thenReturn(ImmutableList.of(((IncGraphView) incGraphView).getMaterializedIncGraph())); PipelinePlanBuilder planBuilder = new PipelinePlanBuilder(); PipelineGraph pipelineGraph = planBuilder.buildPlan(context); @@ -227,8 +228,21 @@ public class PipelinePlanTest extends BasePlanTest { optimizer.optimizePipelineGraph(pipelineGraph); Map<Integer, PipelineVertex> vertexMap = pipelineGraph.getVertexMap(); - Assert.assertEquals(vertexMap.size(), 3); + if (backendType == BackendType.Paimon) { + Assert.assertEquals(vertexMap.size(), 4); + } else { + Assert.assertEquals(vertexMap.size(), 3); + } + } + @Test + public void testRocksDBMaterialize() { + testMaterialize(BackendType.RocksDB); + } + + @Test + public void testPaimonMaterialize() { + testMaterialize(BackendType.Paimon); } @Test diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLInsertTest.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLInsertTest.java index 7d7d34e5..40954dcd 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLInsertTest.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/GQLInsertTest.java @@ -19,7 +19,9 @@ package org.apache.geaflow.dsl.runtime.query; +import org.apache.geaflow.common.config.keys.DSLConfigKeys; import org.apache.geaflow.common.config.keys.FrameworkConfigKeys; +import org.apache.geaflow.view.IViewDesc.BackendType; import org.testng.annotations.Test; public class GQLInsertTest { @@ -114,4 +116,14 @@ public class GQLInsertTest { .execute() .checkSinkResult(); } + + @Test + public void testInsertAndQuery_006() throws Exception { + QueryTester + .build() + .withConfig(DSLConfigKeys.GEAFLOW_DSL_STORE_TYPE.getKey(), BackendType.Paimon.name()) + .withQueryPath("/query/gql_insert_and_graph_006.sql") + .execute() + .checkSinkResult(); + } } diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_insert_and_graph_006.txt b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/gql_insert_and_graph_006.txt new file mode 100644 index 00000000..e69de29b diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_006.sql b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_006.sql new file mode 100644 index 00000000..47f57dd4 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/gql_insert_and_graph_006.sql @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +set geaflow.dsl.window.size = 3; + +CREATE GRAPH dy_modern ( + Vertex person ( + id bigint ID, + name varchar, + age int + ), + Vertex software ( + id bigint ID, + name varchar, + lang varchar + ), + Edge knows ( + srcId bigint SOURCE ID, + targetId bigint DESTINATION ID, + weight double + ), + Edge created ( + srcId bigint SOURCE ID, + targetId bigint DESTINATION ID, + weight double + ) +) WITH ( + storeType='paimon', + shardCount = 1 +); + +CREATE TABLE tbl_result ( + id bigint, + name varchar, + age int +) WITH ( + type='file', + geaflow.dsl.file.path='${target}' +); + +CREATE TABLE tbl_person ( + id bigint, + name varchar, + age int +) WITH ( + type='file', + geaflow.dsl.file.path='resource:///data/modern_vertex_person.txt' +); + +USE GRAPH dy_modern; + +INSERT INTO dy_modern.person(id, name, age) +SELECT * FROM tbl_person; +; diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonGraphStore.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonGraphStore.java index 479786c1..49db1c58 100644 --- a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonGraphStore.java +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonGraphStore.java @@ -19,27 +19,75 @@ package org.apache.geaflow.store.paimon; +import static org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_DATABASE; +import static org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_EDGE_TABLE; +import static org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_INDEX_TABLE; +import static org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_VERTEX_TABLE; + +import org.apache.commons.lang3.StringUtils; +import org.apache.geaflow.common.config.Configuration; import org.apache.geaflow.state.pushdown.inner.CodeGenFilterConverter; import org.apache.geaflow.state.pushdown.inner.DirectFilterConverter; import org.apache.geaflow.state.pushdown.inner.IFilterConverter; import org.apache.geaflow.store.api.graph.IPushDownStore; import org.apache.geaflow.store.config.StoreConfigKeys; import org.apache.geaflow.store.context.StoreContext; +import org.apache.paimon.catalog.Identifier; public abstract class BasePaimonGraphStore extends BasePaimonStore implements IPushDownStore { protected IFilterConverter filterConverter; + protected String vertexTable; + protected String edgeTable; + protected String indexTable; @Override public void init(StoreContext storeContext) { super.init(storeContext); - boolean codegenEnable = - storeContext.getConfig().getBoolean(StoreConfigKeys.STORE_FILTER_CODEGEN_ENABLE); - filterConverter = codegenEnable ? new CodeGenFilterConverter() : new DirectFilterConverter(); + Configuration config = storeContext.getConfig(); + boolean codegenEnable = config.getBoolean(StoreConfigKeys.STORE_FILTER_CODEGEN_ENABLE); + filterConverter = + codegenEnable ? new CodeGenFilterConverter() : new DirectFilterConverter(); + // use distributed table when distributed mode or database is configured. + if (config.contains(PAIMON_STORE_DATABASE) || isDistributedMode) { + paimonStoreName = config.getString(PAIMON_STORE_DATABASE); + vertexTable = config.getString(PAIMON_STORE_VERTEX_TABLE); + edgeTable = config.getString(PAIMON_STORE_EDGE_TABLE); + indexTable = config.getString(PAIMON_STORE_INDEX_TABLE); + } } @Override public IFilterConverter getFilterConverter() { return filterConverter; } + + protected PaimonTableRWHandle createVertexTable(int shardId) { + String tableName = vertexTable; + if (StringUtils.isEmpty(tableName)) { + tableName = String.format("%s#%s", "vertex", shardId); + } + return createTable(tableName); + } + + protected PaimonTableRWHandle createEdgeTable(int shardId) { + String tableName = edgeTable; + if (StringUtils.isEmpty(edgeTable)) { + tableName = String.format("%s#%s", "edge", shardId); + } + return createTable(tableName); + } + + protected PaimonTableRWHandle createIndexTable(int shardId) { + String tableName = indexTable; + if (StringUtils.isEmpty(indexTable)) { + tableName = String.format("%s#%s", "vertex_index", shardId); + } + return createTable(tableName); + } + + private PaimonTableRWHandle createTable(String tableName) { + Identifier vertexIndexIdentifier = new Identifier(paimonStoreName, tableName); + return createKVTableHandle(vertexIndexIdentifier); + } } diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonStore.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonStore.java index 59255dfe..767f1b08 100644 --- a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonStore.java +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/BasePaimonStore.java @@ -19,10 +19,15 @@ package org.apache.geaflow.store.paimon; +import static org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE; +import static org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE; + import org.apache.geaflow.common.config.keys.ExecutionConfigKeys; +import org.apache.geaflow.common.exception.GeaflowRuntimeException; import org.apache.geaflow.store.IStatefulStore; import org.apache.geaflow.store.api.graph.BaseGraphStore; import org.apache.geaflow.store.context.StoreContext; +import org.apache.paimon.catalog.Catalog.TableNotExistException; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.Table; @@ -42,19 +47,25 @@ public abstract class BasePaimonStore extends BaseGraphStore implements IStatefu protected static final String DIRECTION_COLUMN_NAME = "direction"; protected static final String LABEL_COLUMN_NAME = "label"; - protected PaimonTableCatalogClient client; + protected PaimonCatalogClient client; protected int shardId; protected String jobName; protected String paimonStoreName; protected long lastCheckpointId; + protected boolean isDistributedMode; + protected boolean enableAutoCreate; @Override public void init(StoreContext storeContext) { this.shardId = storeContext.getShardId(); this.jobName = storeContext.getConfig().getString(ExecutionConfigKeys.JOB_APP_NAME); this.paimonStoreName = this.jobName + "#" + this.shardId; - this.client = new PaimonTableCatalogClient(storeContext.getConfig()); + this.client = PaimonCatalogManager.getCatalogClient(storeContext.getConfig()); this.lastCheckpointId = Long.MAX_VALUE; + this.isDistributedMode = storeContext.getConfig() + .getBoolean(PAIMON_STORE_DISTRIBUTED_MODE_ENABLE); + this.enableAutoCreate = storeContext.getConfig() + .getBoolean(PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE); } @Override @@ -68,13 +79,23 @@ public abstract class BasePaimonStore extends BaseGraphStore implements IStatefu } protected PaimonTableRWHandle createKVTableHandle(Identifier identifier) { - Schema.Builder schemaBuilder = Schema.newBuilder(); - schemaBuilder.primaryKey(KEY_COLUMN_NAME); - schemaBuilder.column(KEY_COLUMN_NAME, DataTypes.BYTES()); - schemaBuilder.column(VALUE_COLUMN_NAME, DataTypes.BYTES()); - Schema schema = schemaBuilder.build(); - Table vertexTable = this.client.createTable(schema, identifier); - return new PaimonTableRWHandle(identifier, vertexTable); + Table vertexTable; + try { + vertexTable = this.client.getTable(identifier); + } catch (TableNotExistException e) { + if (enableAutoCreate) { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.primaryKey(KEY_COLUMN_NAME); + schemaBuilder.column(KEY_COLUMN_NAME, DataTypes.BYTES()); + schemaBuilder.column(VALUE_COLUMN_NAME, DataTypes.BYTES()); + Schema schema = schemaBuilder.build(); + vertexTable = this.client.createTable(schema, identifier); + } else { + throw new GeaflowRuntimeException("Table " + identifier + " not exist."); + } + } + + return new PaimonTableRWHandle(identifier, vertexTable, shardId, isDistributedMode); } protected PaimonTableRWHandle createEdgeTableHandle(Identifier identifier) { @@ -92,7 +113,7 @@ public abstract class BasePaimonStore extends BaseGraphStore implements IStatefu schemaBuilder.column(VALUE_COLUMN_NAME, DataTypes.BYTES()); Schema schema = schemaBuilder.build(); Table vertexTable = this.client.createTable(schema, identifier); - return new PaimonTableRWHandle(identifier, vertexTable); + return new PaimonTableRWHandle(identifier, vertexTable, shardId); } protected PaimonTableRWHandle createVertexTableHandle(Identifier identifier) { @@ -104,6 +125,6 @@ public abstract class BasePaimonStore extends BaseGraphStore implements IStatefu schemaBuilder.column(VALUE_COLUMN_NAME, DataTypes.BYTES()); Schema schema = schemaBuilder.build(); Table vertexTable = this.client.createTable(schema, identifier); - return new PaimonTableRWHandle(identifier, vertexTable); + return new PaimonTableRWHandle(identifier, vertexTable, shardId); } } diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/DynamicGraphPaimonStoreBase.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/DynamicGraphPaimonStoreBase.java index 8ad3bfba..9c4d202d 100644 --- a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/DynamicGraphPaimonStoreBase.java +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/DynamicGraphPaimonStoreBase.java @@ -34,7 +34,6 @@ import org.apache.geaflow.store.api.graph.IDynamicGraphStore; import org.apache.geaflow.store.context.StoreContext; import org.apache.geaflow.store.paimon.proxy.IGraphMultiVersionedPaimonProxy; import org.apache.geaflow.store.paimon.proxy.PaimonProxyBuilder; -import org.apache.paimon.catalog.Identifier; public class DynamicGraphPaimonStoreBase<K, VV, EV> extends BasePaimonGraphStore implements IDynamicGraphStore<K, VV, EV> { @@ -47,17 +46,9 @@ public class DynamicGraphPaimonStoreBase<K, VV, EV> extends BasePaimonGraphStore int[] projection = new int[]{KEY_COLUMN_INDEX, VALUE_COLUMN_INDEX}; // TODO: Use graph schema to create table instead of KV table. - String vertexTableName = String.format("%s#%s", "vertex", shardId); - Identifier vertexIdentifier = new Identifier(paimonStoreName, vertexTableName); - PaimonTableRWHandle vertexHandle = createKVTableHandle(vertexIdentifier); - - String vertexIndexTableName = String.format("%s#%s", "vertex_index", shardId); - Identifier vertexIndexIdentifier = new Identifier(paimonStoreName, vertexIndexTableName); - PaimonTableRWHandle vertexIndexHandle = createKVTableHandle(vertexIndexIdentifier); - - String edgeTableName = String.format("%s#%s", "edge", shardId); - Identifier edgeIdentifier = new Identifier(paimonStoreName, edgeTableName); - PaimonTableRWHandle edgeHandle = createKVTableHandle(edgeIdentifier); + PaimonTableRWHandle vertexHandle = createVertexTable(shardId); + PaimonTableRWHandle vertexIndexHandle = createIndexTable(shardId); + PaimonTableRWHandle edgeHandle = createEdgeTable(shardId); IGraphKVEncoder<K, VV, EV> encoder = GraphKVEncoderFactory.build(storeContext.getConfig(), storeContext.getGraphSchema()); diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/KVPaimonStore.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/KVPaimonStore.java index 718f0f7c..e2c02841 100644 --- a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/KVPaimonStore.java +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/KVPaimonStore.java @@ -102,14 +102,14 @@ public class KVPaimonStore<K, V> extends BasePaimonStore implements IKVStatefulS byte[] keyArray = this.kvSerializer.serializeKey(key); byte[] valueArray = this.kvSerializer.serializeValue(value); GenericRow record = GenericRow.of(keyArray, valueArray); - this.tableHandle.write(record, 0); + this.tableHandle.write(record); } @Override public void remove(K key) { byte[] keyArray = this.kvSerializer.serializeKey(key); GenericRow record = GenericRow.ofKind(RowKind.DELETE, keyArray, null); - this.tableHandle.write(record, 0); + this.tableHandle.write(record); } @Override diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableCatalogClient.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogClient.java similarity index 64% rename from geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableCatalogClient.java rename to geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogClient.java index 12391880..7db2118f 100644 --- a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableCatalogClient.java +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogClient.java @@ -19,12 +19,13 @@ package org.apache.geaflow.store.paimon; -import static org.apache.geaflow.store.paimon.PaimonConfigKeys.PAIMON_OPTIONS_META_STORE; -import static org.apache.geaflow.store.paimon.PaimonConfigKeys.PAIMON_OPTIONS_WAREHOUSE; +import static org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_META_STORE; +import static org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_WAREHOUSE; import org.apache.geaflow.common.config.Configuration; import org.apache.geaflow.common.exception.GeaflowRuntimeException; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Catalog.TableNotExistException; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; @@ -35,61 +36,36 @@ import org.apache.paimon.table.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PaimonTableCatalogClient { +public class PaimonCatalogClient { - private static final Logger LOGGER = LoggerFactory.getLogger(PaimonTableCatalogClient.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PaimonCatalogClient.class); private final Configuration config; private Catalog catalog; - public PaimonTableCatalogClient(Configuration config) { + public PaimonCatalogClient(Catalog catalog, Configuration config) { this.config = config; + this.catalog = catalog; } public Table createTable(Schema schema, Identifier identifier) { - this.catalog = createCatalog(config); try { + LOGGER.info("create table {}", identifier.getFullName()); this.catalog.createDatabase(identifier.getDatabaseName(), true); this.catalog.createTable(identifier, schema, true); + return getTable(identifier); } catch (Exception e) { throw new GeaflowRuntimeException("Create database or table failed.", e); } - return getTable(identifier); - } - - private Catalog createCatalog(Configuration config) { - Options options = new Options(); - - String metastore = config.getString(PAIMON_OPTIONS_META_STORE); - String warehouse = config.getString(PAIMON_OPTIONS_WAREHOUSE); - - options.set(CatalogOptions.WAREHOUSE, warehouse.toLowerCase()); - options.set(CatalogOptions.METASTORE, metastore.toLowerCase()); - if (!metastore.equalsIgnoreCase("filesystem")) { - throw new UnsupportedOperationException("Not support meta store type " + metastore); - } - - if (!warehouse.startsWith("file://")) { - throw new UnsupportedOperationException( - "Only support warehouse path prefix: [file://]"); - } - - CatalogContext context = CatalogContext.create(options); - return CatalogFactory.createCatalog(context); } public Catalog getCatalog() { return this.catalog; } - public Table getTable(Identifier identifier) { - try { - return this.catalog.getTable(identifier); - } catch (Catalog.TableNotExistException e) { - // do something - throw new GeaflowRuntimeException("table not exist", e); - } + public Table getTable(Identifier identifier) throws TableNotExistException { + return this.catalog.getTable(identifier); } public void close() { diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogManager.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogManager.java new file mode 100644 index 00000000..b8294fc0 --- /dev/null +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonCatalogManager.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.store.paimon; + +import static org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_META_STORE; +import static org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_WAREHOUSE; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.store.paimon.config.PaimonStoreConfig; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PaimonCatalogManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(PaimonCatalogManager.class); + + private static final Map<String, PaimonCatalogClient> catalogMap = new ConcurrentHashMap<>(); + + public static synchronized PaimonCatalogClient getCatalogClient(Configuration config) { + String warehouse = config.getString(PAIMON_STORE_WAREHOUSE); + return catalogMap.computeIfAbsent(warehouse, k -> createCatalog(config)); + } + + private static PaimonCatalogClient createCatalog(Configuration config) { + String metastore = config.getString(PAIMON_STORE_META_STORE); + String warehouse = config.getString(PAIMON_STORE_WAREHOUSE); + + Options options = new Options(); + options.set(CatalogOptions.WAREHOUSE, warehouse.toLowerCase()); + options.set(CatalogOptions.METASTORE, metastore.toLowerCase()); + Map<String, String> extraOptions = PaimonStoreConfig.getPaimonOptions(config); + if (extraOptions != null) { + for (Map.Entry<String, String> entry : extraOptions.entrySet()) { + LOGGER.info("add option: {}={}", entry.getKey(), entry.getValue()); + options.set(entry.getKey(), entry.getValue()); + } + } + if (!metastore.equalsIgnoreCase("filesystem")) { + throw new UnsupportedOperationException("Not support meta store type " + metastore); + } + + CatalogContext context = CatalogContext.create(options); + Catalog catalog = CatalogFactory.createCatalog(context); + return new PaimonCatalogClient(catalog, config); + } + +} diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonConfigKeys.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonConfigKeys.java deleted file mode 100644 index 294a9c1d..00000000 --- a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonConfigKeys.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.geaflow.store.paimon; - -import org.apache.geaflow.common.config.ConfigKey; -import org.apache.geaflow.common.config.ConfigKeys; - -public class PaimonConfigKeys { - - public static final ConfigKey PAIMON_OPTIONS_WAREHOUSE = ConfigKeys - .key("geaflow.store.paimon.options.warehouse") - .defaultValue("file:///tmp/paimon/") - .description("paimon warehouse, default LOCAL path, now support path prefix: " - + "[file://], Options for future: [hdfs://, oss://, s3://]"); - - public static final ConfigKey PAIMON_OPTIONS_META_STORE = ConfigKeys - .key("geaflow.store.paimon.options.meta.store") - .defaultValue("FILESYSTEM") - .description("Metastore of paimon catalog, now support [FILESYSTEM]. Options for future: " - + "[HIVE, JDBC]."); - - public static final ConfigKey PAIMON_OPTIONS_MEMTABLE_SIZE_MB = ConfigKeys - .key("geaflow.store.paimon.memtable.size.mb") - .defaultValue(128) - .description("paimon memtable size, default 256MB"); -} diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableRWHandle.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableRWHandle.java index c72585b2..a9201387 100644 --- a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableRWHandle.java +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/PaimonTableRWHandle.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.OptionalLong; import org.apache.geaflow.common.exception.GeaflowRuntimeException; +import org.apache.geaflow.store.paimon.commit.PaimonCommitRegistry; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -31,27 +32,43 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.utils.Filter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PaimonTableRWHandle { - private Identifier identifier; - - private Table table; - + private static final Logger LOGGER = LoggerFactory.getLogger(PaimonTableRWHandle.class); + private final int shardId; + private final Identifier identifier; + private final Table table; + private final boolean isDistributedMode; + private final PaimonCommitRegistry registry; private StreamTableWrite streamTableWrite; - private List<CommitMessage> commitMessages = new ArrayList<>(); - public PaimonTableRWHandle(Identifier identifier, Table table) { + public PaimonTableRWHandle(Identifier identifier, Table table, int shardId) { + this(identifier, table, shardId, false); + } + + public PaimonTableRWHandle(Identifier identifier, Table table, int shardId, + boolean isDistributedMode) { + this.shardId = shardId; this.identifier = identifier; this.table = table; this.streamTableWrite = table.newStreamWriteBuilder().newWrite(); + this.registry = PaimonCommitRegistry.initInstance(); + this.isDistributedMode = isDistributedMode; + } + + public void write(GenericRow row) { + write(row, shardId); } public void write(GenericRow row, int bucket) { @@ -63,13 +80,33 @@ public class PaimonTableRWHandle { } public void commit(long checkpointId) { - try (StreamTableCommit commit = table.newStreamWriteBuilder().newCommit()) { - flush(checkpointId); - commit.commit(checkpointId, commitMessages); - commitMessages.clear(); - } catch (Exception e) { - throw new GeaflowRuntimeException("Failed to commit data into Paimon.", e); + commit(checkpointId, false); + } + + public void commit(long checkpointId, boolean waitCompaction) { + flush(checkpointId, waitCompaction); + List<CommitMessage> messages = new ArrayList<>(); + for (CommitMessage commitMessage : commitMessages) { + if (((CommitMessageImpl) commitMessage).isEmpty()) { + continue; + } + messages.add(commitMessage); } + + if (isDistributedMode) { + LOGGER.info("{} pre commit chkId:{} messages:{} wait:{}", + this.identifier, checkpointId, messages.size(), waitCompaction); + registry.addMessages(shardId, table.name(), messages); + } else { + LOGGER.info("{} commit chkId:{} messages:{} wait:{}", + this.identifier, checkpointId, messages.size(), waitCompaction); + try (StreamTableCommit commit = table.newStreamWriteBuilder().newCommit()) { + commit.commit(checkpointId, messages); + } catch (Exception e) { + throw new GeaflowRuntimeException("Failed to commit data into Paimon.", e); + } + } + commitMessages.clear(); } public void rollbackTo(long snapshotId) { @@ -92,6 +129,8 @@ public class PaimonTableRWHandle { if (predicate != null) { readBuilder.withFilter(predicate); } + readBuilder.withBucketFilter(bucketId -> bucketId == shardId); + List<Split> splits = readBuilder.newScan().plan().splits(); TableRead tableRead = readBuilder.newRead(); if (predicate != null) { @@ -108,8 +147,13 @@ public class PaimonTableRWHandle { } public void flush(long checkpointIdentifier) { + flush(checkpointIdentifier, false); + } + + public void flush(long checkpointIdentifier, boolean waitCompaction) { try { - this.commitMessages.addAll(streamTableWrite.prepareCommit(false, checkpointIdentifier)); + this.commitMessages.addAll( + streamTableWrite.prepareCommit(waitCompaction, checkpointIdentifier)); } catch (Exception e) { throw new GeaflowRuntimeException("Failed to flush data into Paimon.", e); } diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/StaticGraphPaimonStoreBase.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/StaticGraphPaimonStoreBase.java index 42079eec..4b1dfe66 100644 --- a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/StaticGraphPaimonStoreBase.java +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/StaticGraphPaimonStoreBase.java @@ -35,7 +35,6 @@ import org.apache.geaflow.store.api.graph.IStaticGraphStore; import org.apache.geaflow.store.context.StoreContext; import org.apache.geaflow.store.paimon.proxy.IGraphPaimonProxy; import org.apache.geaflow.store.paimon.proxy.PaimonProxyBuilder; -import org.apache.paimon.catalog.Identifier; public class StaticGraphPaimonStoreBase<K, VV, EV> extends BasePaimonGraphStore implements IStaticGraphStore<K, VV, EV> { @@ -49,14 +48,8 @@ public class StaticGraphPaimonStoreBase<K, VV, EV> extends BasePaimonGraphStore super.init(storeContext); int[] projection = new int[]{KEY_COLUMN_INDEX, VALUE_COLUMN_INDEX}; - // TODO: Use graph schema to create table instead of KV table. - String vertexTableName = String.format("%s#%s", "vertex", shardId); - Identifier vertexIdentifier = new Identifier(paimonStoreName, vertexTableName); - PaimonTableRWHandle vertexHandle = createKVTableHandle(vertexIdentifier); - - String edgeTableName = String.format("%s#%s", "edge", shardId); - Identifier edgeIdentifier = new Identifier(paimonStoreName, edgeTableName); - PaimonTableRWHandle edgeHandle = createKVTableHandle(edgeIdentifier); + PaimonTableRWHandle vertexHandle = createVertexTable(shardId); + PaimonTableRWHandle edgeHandle = createEdgeTable(shardId); this.sortAtom = storeContext.getGraphSchema().getEdgeAtoms().get(1); IGraphKVEncoder<K, VV, EV> encoder = GraphKVEncoderFactory.build(storeContext.getConfig(), diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonCommitRegistry.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonCommitRegistry.java new file mode 100644 index 00000000..8468a410 --- /dev/null +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonCommitRegistry.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.store.paimon.commit; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.paimon.table.sink.CommitMessage; + +public class PaimonCommitRegistry { + + private static PaimonCommitRegistry instance; + + private final ConcurrentHashMap<Integer, List<TaskCommitMessage>> index2CommitMessages = + new ConcurrentHashMap<>(); + + public static synchronized PaimonCommitRegistry initInstance() { + if (instance == null) { + instance = new PaimonCommitRegistry(); + } + return instance; + } + + public static synchronized PaimonCommitRegistry getInstance() { + return instance; + } + + public synchronized void addMessages(int index, String tableName, + List<CommitMessage> commitMessages) { + List<TaskCommitMessage> message = index2CommitMessages.computeIfAbsent(index, + key -> new ArrayList<>()); + message.add(new TaskCommitMessage(tableName, commitMessages)); + } + + public synchronized List<TaskCommitMessage> pollMessages(int index) { + List<TaskCommitMessage> messages = index2CommitMessages.get(index); + if (messages != null && !messages.isEmpty()) { + List<TaskCommitMessage> result = new ArrayList<>(messages); + messages.clear(); + return result; + } + return messages; + } + + public static class TaskCommitMessage implements Serializable { + private final String tableName; + private final List<CommitMessage> messages; + + public TaskCommitMessage(String tableName, List<CommitMessage> messages) { + this.tableName = tableName; + this.messages = messages; + } + + public String getTableName() { + return tableName; + } + + public List<CommitMessage> getMessages() { + return messages; + } + } +} diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonMessage.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonMessage.java new file mode 100644 index 00000000..686ef8f4 --- /dev/null +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/commit/PaimonMessage.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.store.paimon.commit; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Serializable; +import java.util.List; +import org.apache.geaflow.common.exception.GeaflowRuntimeException; +import org.apache.paimon.io.DataInputViewStreamWrapper; +import org.apache.paimon.io.DataOutputViewStreamWrapper; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PaimonMessage implements Serializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(PaimonMessage.class); + private static final ThreadLocal<CommitMessageSerializer> CACHE = ThreadLocal.withInitial( + CommitMessageSerializer::new); + + private transient List<CommitMessage> messages; + private byte[] messageBytes; + private int serializerVersion; + private String tableName; + private long chkId; + + public PaimonMessage(long chkId, String tableName, List<CommitMessage> messages) { + this.chkId = chkId; + this.tableName = tableName; + + CommitMessageSerializer serializer = CACHE.get(); + this.serializerVersion = serializer.getVersion(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + serializer.serializeList(messages, new DataOutputViewStreamWrapper(out)); + this.messageBytes = out.toByteArray(); + LOGGER.info("ser bytes: {}", messageBytes.length); + } catch (Exception e) { + LOGGER.error("serialize message error", e); + throw new GeaflowRuntimeException(e); + } + } + + public List<CommitMessage> getMessages() { + if (messages == null) { + CommitMessageSerializer serializer = CACHE.get(); + try { + if (messageBytes == null) { + LOGGER.warn("deserialize message error, null"); + } + ByteArrayInputStream in = new ByteArrayInputStream(messageBytes); + messages = serializer.deserializeList(serializerVersion, + new DataInputViewStreamWrapper(in)); + } catch (Exception e) { + LOGGER.error("deserialize message error", e); + throw new GeaflowRuntimeException(e); + } + } + return messages; + } + + public String getTableName() { + return tableName; + } + + public long getCheckpointId() { + return chkId; + } +} diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonConfigKeys.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonConfigKeys.java new file mode 100644 index 00000000..1e4c80b1 --- /dev/null +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonConfigKeys.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.store.paimon.config; + +import org.apache.geaflow.common.config.ConfigKey; +import org.apache.geaflow.common.config.ConfigKeys; + +public class PaimonConfigKeys { + + public static final ConfigKey PAIMON_STORE_WAREHOUSE = ConfigKeys + .key("geaflow.store.paimon.warehouse") + .defaultValue("file:///tmp/paimon/") + .description("paimon warehouse, default LOCAL path, now support path prefix: " + + "[file://], Options for future: [hdfs://, oss://, s3://]"); + + public static final ConfigKey PAIMON_STORE_META_STORE = ConfigKeys + .key("geaflow.store.paimon.meta.store") + .defaultValue("FILESYSTEM") + .description("Metastore of paimon catalog, now support [FILESYSTEM]. Options for future: " + + "[HIVE, JDBC]."); + + public static final ConfigKey PAIMON_STORE_OPTIONS = ConfigKeys + .key("geaflow.store.paimon.options") + .defaultValue(128) + .description("paimon memtable size, default 256MB"); + + public static final ConfigKey PAIMON_STORE_DATABASE = ConfigKeys + .key("geaflow.store.paimon.database") + .defaultValue("graph") + .description("paimon graph store database"); + + public static final ConfigKey PAIMON_STORE_VERTEX_TABLE = ConfigKeys + .key("geaflow.store.paimon.vertex.table") + .defaultValue("vertex") + .description("paimon graph store vertex table name"); + + public static final ConfigKey PAIMON_STORE_EDGE_TABLE = ConfigKeys + .key("geaflow.store.paimon.edge.table") + .defaultValue("edge") + .description("paimon graph store edge table name"); + + public static final ConfigKey PAIMON_STORE_INDEX_TABLE = ConfigKeys + .key("geaflow.store.paimon.index.table") + .defaultValue("index") + .description("paimon graph store index table name"); + + public static final ConfigKey PAIMON_STORE_DISTRIBUTED_MODE_ENABLE = ConfigKeys + .key("geaflow.store.paimon.distributed.mode.enable") + .defaultValue(true) + .description("paimon graph store distributed mode"); + + public static final ConfigKey PAIMON_STORE_TABLE_AUTO_CREATE_ENABLE = ConfigKeys + .key("geaflow.store.paimon.table.auto.create") + .defaultValue(false) + .description("paimon graph store table auto create"); + +} diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonStoreConfig.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonStoreConfig.java new file mode 100644 index 00000000..e82da589 --- /dev/null +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/config/PaimonStoreConfig.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.store.paimon.config; + +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang.StringUtils; +import org.apache.geaflow.common.config.Configuration; + +public class PaimonStoreConfig { + + public static Map<String, String> getPaimonOptions(Configuration config) { + String s = config.getString(PaimonConfigKeys.PAIMON_STORE_OPTIONS, ""); + if (StringUtils.isEmpty(s)) { + return null; + } + Map<String, String> options = new HashMap<>(); + String[] pairs = s.trim().split(","); + for (String pair : pairs) { + String[] kv = pair.trim().split("="); + if (kv.length < 2) { + continue; + } + options.put(kv[0], kv[1]); + } + return options; + } + +} diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonBaseGraphProxy.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonBaseGraphProxy.java index 2723988a..7fd237c3 100644 --- a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonBaseGraphProxy.java +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonBaseGraphProxy.java @@ -80,14 +80,14 @@ public abstract class PaimonBaseGraphProxy<K, VV, EV> implements IGraphPaimonPro public void addEdge(IEdge<K, EV> edge) { Tuple<byte[], byte[]> tuple = this.encoder.getEdgeEncoder().format(edge); GenericRow record = GenericRow.of(tuple.f0, tuple.f1); - this.edgeHandle.write(record, 0); + this.edgeHandle.write(record); } @Override public void addVertex(IVertex<K, VV> vertex) { Tuple<byte[], byte[]> tuple = this.encoder.getVertexEncoder().format(vertex); GenericRow record = GenericRow.of(tuple.f0, tuple.f1); - this.vertexHandle.write(record, 0); + this.vertexHandle.write(record); } @Override diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonGraphMultiVersionedProxy.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonGraphMultiVersionedProxy.java index 3d91262c..0e77dc2b 100644 --- a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonGraphMultiVersionedProxy.java +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/main/java/org/apache/geaflow/store/paimon/proxy/PaimonGraphMultiVersionedProxy.java @@ -132,7 +132,7 @@ public class PaimonGraphMultiVersionedProxy<K, VV, EV> implements Tuple<byte[], byte[]> tuple = edgeEncoder.format(edge); byte[] bVersion = getBinaryVersion(version); GenericRow record = GenericRow.of(concat(bVersion, tuple.f0), tuple.f1); - this.edgeHandle.write(record, 0); + this.edgeHandle.write(record); } @Override @@ -141,8 +141,8 @@ public class PaimonGraphMultiVersionedProxy<K, VV, EV> implements byte[] bVersion = getBinaryVersion(version); GenericRow record = GenericRow.of(concat(bVersion, tuple.f0), tuple.f1); GenericRow index = GenericRow.of(concat(tuple.f0, bVersion), EMPTY_BYTES); - this.vertexHandle.write(record, 0); - this.vertexIndexHandle.write(index, 0); + this.vertexHandle.write(record); + this.vertexIndexHandle.write(index); } @Override diff --git a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/test/java/org/apache/geaflow/store/paimon/PaimonRWHandleTest.java b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/test/java/org/apache/geaflow/store/paimon/PaimonRWHandleTest.java index d48f1788..1fe1e9ba 100644 --- a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/test/java/org/apache/geaflow/store/paimon/PaimonRWHandleTest.java +++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-paimon/src/test/java/org/apache/geaflow/store/paimon/PaimonRWHandleTest.java @@ -96,7 +96,7 @@ public class PaimonRWHandleTest { value.getBytes() // value ); - edgeHandle.write(row, 0); + edgeHandle.write(row); long checkpointId = 1L; edgeHandle.commit(checkpointId); diff --git a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonDynamicGraphStateTest.java b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonDynamicGraphStateTest.java index 371598a9..4e3523a6 100644 --- a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonDynamicGraphStateTest.java +++ b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonDynamicGraphStateTest.java @@ -45,7 +45,7 @@ import org.apache.geaflow.state.data.OneDegreeGraph; import org.apache.geaflow.state.descriptor.GraphStateDescriptor; import org.apache.geaflow.state.pushdown.filter.IEdgeFilter; import org.apache.geaflow.state.pushdown.filter.IVertexFilter; -import org.apache.geaflow.store.paimon.PaimonConfigKeys; +import org.apache.geaflow.store.paimon.config.PaimonConfigKeys; import org.apache.geaflow.utils.keygroup.DefaultKeyGroupAssigner; import org.apache.geaflow.utils.keygroup.KeyGroup; import org.testng.Assert; @@ -67,7 +67,7 @@ public class PaimonDynamicGraphStateTest { config.put(FileConfigKeys.PERSISTENT_TYPE.getKey(), "LOCAL"); config.put(FileConfigKeys.ROOT.getKey(), "/tmp/geaflow/chk/"); config.put(FileConfigKeys.JSON_CONFIG.getKey(), GsonUtil.toJson(persistConfig)); - config.put(PaimonConfigKeys.PAIMON_OPTIONS_WAREHOUSE.getKey(), + config.put(PaimonConfigKeys.PAIMON_STORE_WAREHOUSE.getKey(), "file:///tmp/PaimonDynamicGraphStateTest/"); } @@ -98,13 +98,23 @@ public class PaimonDynamicGraphStateTest { @Test public void testWriteRead() { - testApi(false); + Map<String, String> conf = new HashMap<>(config); + conf.put(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE.getKey(), "false"); + conf.put(PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(), "false"); + testApi(conf); } - private void testApi(boolean async) { - Map<String, String> conf = config; + @Test + public void testWriteRead2() { + Map<String, String> conf = new HashMap<>(config); + conf.put(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE.getKey(), "false"); + conf.put(PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(), "false"); + conf.put(PaimonConfigKeys.PAIMON_STORE_DATABASE.getKey(), "graph"); + testApi(conf); + } + + private void testApi(Map<String, String> conf) { conf.put(StateConfigKeys.STATE_WRITE_BUFFER_SIZE.getKey(), "100"); - conf.put(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE.getKey(), String.valueOf(async)); GraphState<String, String, String> graphState = getGraphState(StringType.INSTANCE, "testApi", conf); graphState.manage().operate().setCheckpointId(1); @@ -126,7 +136,6 @@ public class PaimonDynamicGraphStateTest { graphState.dynamicGraph().E().add(4L, new ValueEdge<>("1", "2", "6")); graphState.manage().operate().finish(); graphState.manage().operate().archive(); - ; List<IEdge<String, String>> list = graphState.dynamicGraph().E().query(1L, "1").asList(); Assert.assertEquals(list.size(), 2); @@ -191,7 +200,8 @@ public class PaimonDynamicGraphStateTest { @Test public void testKeyGroup() { - Map<String, String> conf = config; + Map<String, String> conf = new HashMap<>(config); + conf.put(PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(), "false"); GraphState<String, String, String> graphState = getGraphState(StringType.INSTANCE, "testKeyGroup", conf); graphState.manage().operate().setCheckpointId(1); diff --git a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java index 81937688..1d64a57f 100644 --- a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java +++ b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonGraphStateTest.java @@ -19,13 +19,17 @@ package org.apache.geaflow.state; +import static org.apache.geaflow.common.config.keys.StateConfigKeys.STATE_WRITE_ASYNC_ENABLE; +import static org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_DATABASE; +import static org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE; +import static org.apache.geaflow.store.paimon.config.PaimonConfigKeys.PAIMON_STORE_WAREHOUSE; + import java.io.File; import java.util.HashMap; import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.geaflow.common.config.Configuration; import org.apache.geaflow.common.config.keys.ExecutionConfigKeys; -import org.apache.geaflow.common.config.keys.StateConfigKeys; import org.apache.geaflow.common.type.IType; import org.apache.geaflow.common.type.primitive.StringType; import org.apache.geaflow.common.utils.GsonUtil; @@ -36,7 +40,7 @@ import org.apache.geaflow.model.graph.meta.GraphMeta; import org.apache.geaflow.model.graph.meta.GraphMetaType; import org.apache.geaflow.model.graph.vertex.impl.ValueVertex; import org.apache.geaflow.state.descriptor.GraphStateDescriptor; -import org.apache.geaflow.store.paimon.PaimonConfigKeys; +import org.apache.geaflow.store.paimon.config.PaimonConfigKeys; import org.apache.geaflow.utils.keygroup.DefaultKeyGroupAssigner; import org.apache.geaflow.utils.keygroup.KeyGroup; import org.testng.Assert; @@ -58,7 +62,7 @@ public class PaimonGraphStateTest { config.put(FileConfigKeys.PERSISTENT_TYPE.getKey(), "LOCAL"); config.put(FileConfigKeys.ROOT.getKey(), "/tmp/geaflow/chk/"); config.put(FileConfigKeys.JSON_CONFIG.getKey(), GsonUtil.toJson(persistConfig)); - config.put(PaimonConfigKeys.PAIMON_OPTIONS_WAREHOUSE.getKey(), + config.put(PaimonConfigKeys.PAIMON_STORE_WAREHOUSE.getKey(), "file:///tmp/PaimonGraphStateTest/"); } @@ -87,9 +91,7 @@ public class PaimonGraphStateTest { return graphState; } - public void testWriteRead(boolean async) { - Map<String, String> conf = new HashMap<>(config); - conf.put(StateConfigKeys.STATE_WRITE_ASYNC_ENABLE.getKey(), Boolean.toString(async)); + public void testWriteRead(Map<String, String> conf) { GraphState<String, String, String> graphState = getGraphState(StringType.INSTANCE, "write_read", conf); @@ -102,6 +104,7 @@ public class PaimonGraphStateTest { graphState.staticGraph().E().add(new ValueEdge<>("1", id, "edge_hello")); } // read nothing since not committed + boolean async = Boolean.parseBoolean(conf.get(STATE_WRITE_ASYNC_ENABLE.getKey())); if (!async) { Assert.assertNull(graphState.staticGraph().V().query("1").get()); Assert.assertEquals(graphState.staticGraph().E().query("1").asList().size(), 0); @@ -150,8 +153,26 @@ public class PaimonGraphStateTest { @Test public void testBothWriteMode() { - testWriteRead(true); - testWriteRead(false); + Map<String, String> conf = new HashMap<>(); + conf.put(STATE_WRITE_ASYNC_ENABLE.getKey(), Boolean.TRUE.toString()); + conf.put(PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(), "false"); + testWriteRead(conf); + + conf.put(STATE_WRITE_ASYNC_ENABLE.getKey(), Boolean.TRUE.toString()); + testWriteRead(conf); + } + + @Test + public void testBothWriteMode2() { + Map<String, String> conf = new HashMap<>(); + conf.put(STATE_WRITE_ASYNC_ENABLE.getKey(), Boolean.TRUE.toString()); + conf.put(PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(), "false"); + conf.put(PAIMON_STORE_WAREHOUSE.getKey(), "/tmp/testBothWriteMode2"); + conf.put(PAIMON_STORE_DATABASE.getKey(), "graph"); + testWriteRead(conf); + + conf.put(STATE_WRITE_ASYNC_ENABLE.getKey(), Boolean.TRUE.toString()); + testWriteRead(conf); } } diff --git a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonKeyStateTest.java b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonKeyStateTest.java index 0f254149..c43095e8 100644 --- a/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonKeyStateTest.java +++ b/geaflow/geaflow-state/geaflow-state-impl/src/test/java/org/apache/geaflow/state/PaimonKeyStateTest.java @@ -27,7 +27,7 @@ import org.apache.geaflow.common.config.Configuration; import org.apache.geaflow.common.config.keys.ExecutionConfigKeys; import org.apache.geaflow.file.FileConfigKeys; import org.apache.geaflow.state.descriptor.KeyMapStateDescriptor; -import org.apache.geaflow.store.paimon.PaimonConfigKeys; +import org.apache.geaflow.store.paimon.config.PaimonConfigKeys; import org.apache.geaflow.utils.keygroup.DefaultKeyGroupAssigner; import org.apache.geaflow.utils.keygroup.KeyGroup; import org.testng.Assert; @@ -46,7 +46,9 @@ public class PaimonKeyStateTest { config.put(ExecutionConfigKeys.JOB_APP_NAME.getKey(), "PaimonKeyStateTest"); config.put(FileConfigKeys.PERSISTENT_TYPE.getKey(), "LOCAL"); config.put(FileConfigKeys.ROOT.getKey(), "/tmp/geaflow/chk/"); - config.put(PaimonConfigKeys.PAIMON_OPTIONS_WAREHOUSE.getKey(), "file:///tmp/PaimonKeyStateTest/"); + config.put(PaimonConfigKeys.PAIMON_STORE_WAREHOUSE.getKey(), "file:///tmp" + + "/PaimonKeyStateTest/"); + config.put(PaimonConfigKeys.PAIMON_STORE_DISTRIBUTED_MODE_ENABLE.getKey(), "false"); } @AfterClass @@ -72,7 +74,7 @@ public class PaimonKeyStateTest { Assert.assertEquals(mapState.get("hello").size(), 0); // commit chk = 1, now be able to read data. mapState.manage().operate().archive(); - Assert.assertEquals(mapState.get("hello").size(), 4); + Assert.assertEquals(mapState.get("hello").size(), 5); // set chk = 2 mapState.manage().operate().setCheckpointId(2L); @@ -81,7 +83,7 @@ public class PaimonKeyStateTest { conf2.put("conf2", "test"); mapState.put("hello2", conf2); // cannot read data with chk = 2 since chk2 not committed. - Assert.assertEquals(mapState.get("hello").size(), 4); + Assert.assertEquals(mapState.get("hello").size(), 5); Assert.assertEquals(mapState.get("hello2").size(), 0); // commit chk = 2 @@ -89,8 +91,8 @@ public class PaimonKeyStateTest { mapState.manage().operate().archive(); // now be able to read data - Assert.assertEquals(mapState.get("hello").size(), 4); - Assert.assertEquals(mapState.get("hello2").size(), 5); + Assert.assertEquals(mapState.get("hello").size(), 5); + Assert.assertEquals(mapState.get("hello2").size(), 6); // read data which not exists Assert.assertEquals(mapState.get("hello3").size(), 0); diff --git a/geaflow/pom.xml b/geaflow/pom.xml index c1735018..067a0e76 100644 --- a/geaflow/pom.xml +++ b/geaflow/pom.xml @@ -50,7 +50,7 @@ <fastjson.version>1.2.71_noneautotype</fastjson.version> <hikaricp.version>4.0.3</hikaricp.version> <sqlite.version>3.40.0.0</sqlite.version> - <zstd.version>1.4.3-1</zstd.version> + <zstd.version>1.5.5-11</zstd.version> <rocksdb.version>7.7.3</rocksdb.version> <paimon.version>1.0.1</paimon.version> <snappy.version>1.1.8.4</snappy.version> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
