This is an automated email from the ASF dual-hosted git repository.
qingwzhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new 69e14de22 feat: Paimon stream source. (#662)
69e14de22 is described below
commit 69e14de224361ddc585fd0626d20f60a54e1cdeb
Author: moses <[email protected]>
AuthorDate: Wed Nov 26 19:06:38 2025 +0800
feat: Paimon stream source. (#662)
* feat: Paimon stream source.
* add lisences
* check style
* check style
* minor fix
* fix list partitions
* throws exception
* fix tests
---
.../dsl/connector/paimon/IteratorWrapper.java | 51 +++
.../dsl/connector/paimon/PaimonConfigKeys.java | 14 +
.../connector/paimon/PaimonRecordDeserializer.java | 24 +-
.../dsl/connector/paimon/PaimonTableSource.java | 384 ++++++++++++++++-----
.../geaflow/dsl/connector/paimon/SourceMode.java | 26 ++
.../geaflow/dsl/connector/paimon/StartupMode.java | 56 +++
.../connector/paimon/PaimonTableConnectorTest.java | 307 +++++++++++++---
7 files changed, 712 insertions(+), 150 deletions(-)
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/IteratorWrapper.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/IteratorWrapper.java
new file mode 100644
index 000000000..3ce921708
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/IteratorWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.CloseableIterator;
+
+public class IteratorWrapper implements CloseableIterator<Row> {
+
+ private final CloseableIterator<InternalRow> iterator;
+ private final PaimonRecordDeserializer deserializer;
+
+ public IteratorWrapper(CloseableIterator<InternalRow> iterator,
+ PaimonRecordDeserializer deserializer) {
+ this.iterator = iterator;
+ this.deserializer = deserializer;
+ }
+
+ @Override
+ public void close() throws Exception {
+ iterator.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Row next() {
+ return deserializer.deserialize(iterator.next());
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java
index 106ef026b..7218ed73b 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java
@@ -49,4 +49,18 @@ public class PaimonConfigKeys {
.noDefaultValue()
.description("The paimon table name to read.");
+ public static final ConfigKey GEAFLOW_DSL_PAIMON_SOURCE_MODE = ConfigKeys
+ .key("geaflow.dsl.paimon.source.mode")
+ .defaultValue(SourceMode.BATCH.name())
+ .description("The paimon source mode, if stream, will continue to
read data from paimon.");
+
+ public static final ConfigKey GEAFLOW_DSL_PAIMON_SCAN_SNAPSHOT_ID =
ConfigKeys
+ .key("geaflow.dsl.paimon.scan.snapshot.id")
+ .defaultValue(null)
+ .description("If scan mode is from-snapshot, this parameter is
required.");
+
+ public static final ConfigKey GEAFLOW_DSL_PAIMON_SCAN_MODE = ConfigKeys
+ .key("geaflow.dsl.paimon.scan.mode")
+ .defaultValue(StartupMode.LATEST.getValue())
+ .description("Determines the scan mode for paimon source, 'latest'
or 'from-snapshot'.");
}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java
index d38c360d6..1cbd523c6 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java
@@ -19,9 +19,6 @@
package org.apache.geaflow.dsl.connector.paimon;
-import java.util.Collections;
-import java.util.List;
-import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.type.Types;
import org.apache.geaflow.dsl.common.data.Row;
import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
@@ -29,22 +26,21 @@ import
org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
import org.apache.geaflow.dsl.common.types.StructType;
import org.apache.geaflow.dsl.common.types.TableField;
import org.apache.geaflow.dsl.common.types.TableSchema;
-import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
import org.apache.paimon.data.InternalRow;
-public class PaimonRecordDeserializer implements TableDeserializer<Object> {
+public class PaimonRecordDeserializer {
private StructType schema;
- private TableSchema tableSchema;
- @Override
- public void init(Configuration conf, StructType schema) {
- this.tableSchema = (TableSchema) schema;
- this.schema = this.tableSchema.getDataSchema();
+ public void init(StructType schema) {
+ TableSchema tableSchema = (TableSchema) schema;
+ this.schema = tableSchema.getDataSchema();
}
- @Override
- public List<Row> deserialize(Object record) {
+ public Row deserialize(Object record) {
+ if (record == null) {
+ return null;
+ }
InternalRow internalRow = (InternalRow) record;
assert internalRow.getFieldCount() == schema.size();
Object[] values = new Object[schema.size()];
@@ -70,8 +66,6 @@ public class PaimonRecordDeserializer implements
TableDeserializer<Object> {
values[i] = internalRow.getLong(i);
break;
case Types.TYPE_NAME_STRING:
- values[i] = internalRow.getString(i);
- break;
case Types.TYPE_NAME_BINARY_STRING:
values[i] = internalRow.getString(i);
break;
@@ -80,6 +74,6 @@ public class PaimonRecordDeserializer implements
TableDeserializer<Object> {
field.getType().getName());
}
}
- return Collections.singletonList(ObjectRow.create(values));
+ return ObjectRow.create(values);
}
}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java
index b980db11a..f5ba0eb59 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java
@@ -21,20 +21,18 @@ package org.apache.geaflow.dsl.connector.paimon;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.geaflow.api.context.RuntimeContext;
import org.apache.geaflow.common.config.Configuration;
-import org.apache.geaflow.common.config.keys.DSLConfigKeys;
import org.apache.geaflow.common.utils.GsonUtil;
import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
import org.apache.geaflow.dsl.common.types.TableSchema;
-import org.apache.geaflow.dsl.common.util.Windows;
import org.apache.geaflow.dsl.connector.api.FetchData;
import org.apache.geaflow.dsl.connector.api.Offset;
import org.apache.geaflow.dsl.connector.api.Partition;
@@ -50,19 +48,21 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.utils.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/** Paimon table source. */
public class PaimonTableSource implements TableSource {
private static final Logger LOGGER =
LoggerFactory.getLogger(PaimonTableSource.class);
private Configuration tableConf;
private TableSchema tableSchema;
- private boolean isAllWindow;
private String path;
private Map<String, String> options;
@@ -70,17 +70,18 @@ public class PaimonTableSource implements TableSource {
private Map<String, String> configs;
private String database;
private String table;
+ private SourceMode sourceMode;
+ private final PaimonRecordDeserializer deserializer = new
PaimonRecordDeserializer();
- private transient CatalogContext catalogContext;
- private transient Catalog catalog;
+ private transient long fromSnapshot;
private transient ReadBuilder readBuilder;
- private transient Map<PaimonPartition, RecordReader<InternalRow>>
partition2Reader;
- private transient Map<PaimonPartition, PaimonOffset> partition2InnerOffset;
+ private transient Map<Split, CloseableIterator<InternalRow>> iterators;
+ private transient Map<Split, RecordReader<InternalRow>> readers;
+ private transient Map<PaimonPartition, PaimonOffset> offsets;
@Override
public void init(Configuration tableConf, TableSchema tableSchema) {
this.tableConf = tableConf;
- this.isAllWindow =
tableConf.getLong(DSLConfigKeys.GEAFLOW_DSL_WINDOW_SIZE) ==
Windows.SIZE_OF_ALL_WINDOW;
this.tableSchema = tableSchema;
this.path =
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_WAREHOUSE, "");
this.options = new HashMap<>();
@@ -89,71 +90,65 @@ public class PaimonTableSource implements TableSource {
String optionJson =
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_OPTIONS_JSON);
Map<String, String> userOptions = GsonUtil.parse(optionJson);
if (userOptions != null) {
- for (Map.Entry<String, String> entry : userOptions.entrySet())
{
- options.put(entry.getKey(), entry.getValue());
- }
+ options.putAll(userOptions);
}
this.configJson =
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_CONFIGURATION_JSON, "");
if (!StringUtils.isBlank(configJson)) {
Map<String, String> userConfig = GsonUtil.parse(configJson);
if (userConfig != null) {
- for (Map.Entry<String, String> entry :
userConfig.entrySet()) {
- configs.put(entry.getKey(), entry.getValue());
- }
+ configs.putAll(userConfig);
}
}
}
this.database =
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_DATABASE_NAME);
this.table =
tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_TABLE_NAME);
+ this.sourceMode =
SourceMode.valueOf(tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SOURCE_MODE).toUpperCase());
}
@Override
public List<Partition> listPartitions(int parallelism) {
- return listPartitions();
+ List<Partition> partitions = new ArrayList<>(parallelism);
+ for (int i = 0; i < parallelism; i++) {
+ partitions.add(new PaimonPartition(database, table));
+ }
+ return partitions;
+ }
+
+ @Override
+ public List<Partition> listPartitions() {
+ throw new UnsupportedOperationException("Please use listPartitions(int
parallelism) instead");
}
@Override
public void open(RuntimeContext context) {
- if (StringUtils.isBlank(this.path)) {
- if (StringUtils.isBlank(this.configJson)) {
- this.catalogContext =
- Objects.requireNonNull(CatalogContext.create(new
Options(options)));
- } else {
- org.apache.hadoop.conf.Configuration hadoopConf = new
org.apache.hadoop.conf.Configuration();
- for (Map.Entry<String, String> entry : configs.entrySet()) {
- hadoopConf.set(entry.getKey(), entry.getValue());
- }
- this.catalogContext =
- Objects.requireNonNull(CatalogContext.create(new
Options(options), hadoopConf));
- }
- } else {
- this.catalogContext =
Objects.requireNonNull(CatalogContext.create(new Path(path)));
- }
- this.catalog =
Objects.requireNonNull(CatalogFactory.createCatalog(this.catalogContext));
+ Catalog catalog = getPaimonCatalog();
Identifier identifier = Identifier.create(database, table);
try {
this.readBuilder =
Objects.requireNonNull(catalog.getTable(identifier).newReadBuilder());
+ if
(tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SCAN_MODE)
+ .equalsIgnoreCase(StartupMode.FROM_SNAPSHOT.getValue())
+ &&
tableConf.contains(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SCAN_SNAPSHOT_ID)) {
+ this.fromSnapshot =
tableConf.getLong(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SCAN_SNAPSHOT_ID);
+ } else {
+ this.fromSnapshot =
catalog.getTable(identifier).latestSnapshotId().orElse(1L);
+ }
+ LOGGER.info("New partition will start from snapshot: {}",
this.fromSnapshot);
} catch (TableNotExistException e) {
throw new GeaFlowDSLException("Table: {} in db: {} not exists.",
table, database);
}
- this.partition2Reader = new HashMap<>();
- this.partition2InnerOffset = new HashMap<>();
+ this.iterators = new HashMap<>();
+ this.readers = new HashMap<>();
+ this.offsets = new HashMap<>();
LOGGER.info("Open paimon source, tableConf: {}, tableSchema: {}, path:
{}, options: "
+ "{}, configs: {}, database: {}, tableName: {}", tableConf,
tableSchema, path,
options, configs, database, table);
- }
-
- @Override
- public List<Partition> listPartitions() {
- List<Split> splits = isAllWindow ?
readBuilder.newScan().plan().splits() :
- readBuilder.newStreamScan().plan().splits();
- return splits.stream().map(split -> new PaimonPartition(database,
table, split)).collect(Collectors.toList());
+ this.deserializer.init(tableSchema);
}
@Override
public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
- return (TableDeserializer<IN>) new PaimonRecordDeserializer();
+ return null;
}
@Override
@@ -162,64 +157,220 @@ public class PaimonTableSource implements TableSource {
PaimonPartition paimonPartition = (PaimonPartition) partition;
assert paimonPartition.getDatabase().equals(this.database)
&& paimonPartition.getTable().equals(this.table);
- RecordReader reader = partition2Reader.getOrDefault(partition,
- readBuilder.newRead().createReader(paimonPartition.getSplit()));
- partition2Reader.put(paimonPartition, reader);
- PaimonOffset innerOffset =
partition2InnerOffset.getOrDefault(partition,
- new PaimonOffset());
- partition2InnerOffset.put(paimonPartition, innerOffset);
+ long skip = 0;
+ PaimonOffset innerOffset = offsets.get(partition);
+ if (innerOffset == null) {
+ // first fetch, use custom specified snapshot id
+ innerOffset = new PaimonOffset(fromSnapshot);
+ }
+
+ // if startOffset is specified, use it and try reset innerOffset
+ if ((startOffset.isPresent() &&
!startOffset.get().equals(innerOffset))) {
+ skip = Math.abs(startOffset.get().getOffset() -
innerOffset.getOffset());
+ innerOffset = PaimonOffset.from((PaimonOffset) startOffset.get());
+ }
+ if (paimonPartition.getCurrentSnapshot() !=
innerOffset.getSnapshotId()) {
+ paimonPartition.reset(loadSplitsFrom(innerOffset.getSnapshotId()));
+ }
+
+ Split split = paimonPartition.seek(innerOffset.getSplitIndex());
+ if (split == null) {
+ if (sourceMode == SourceMode.BATCH) {
+ LOGGER.info("No more split to fetch");
+ return FetchData.createBatchFetch(Collections.emptyIterator(),
innerOffset);
+ } else {
+ LOGGER.debug("Snapshot {} not ready now",
innerOffset.getSnapshotId());
+ return FetchData.createStreamFetch(Collections.emptyList(),
innerOffset, false);
+ }
+ }
+ offsets.put(paimonPartition, innerOffset);
- if (startOffset.isPresent() && !startOffset.get().equals(innerOffset))
{
- throw new GeaFlowDSLException("Paimon connector not support reset
offset.");
+ CloseableIterator<InternalRow> iterator = createRecordIterator(split);
+ if (skip > 0) {
+ while (iterator.hasNext() && skip > 0) {
+ iterator.next();
+ skip--;
+ }
}
- CloseableIterator iterator = reader.toCloseableIterator();
switch (windowInfo.getType()) {
case ALL_WINDOW:
- return FetchData.createBatchFetch(iterator, new
PaimonOffset());
+ return FetchData.createBatchFetch(new
IteratorWrapper(iterator, deserializer), new PaimonOffset());
case SIZE_TUMBLING_WINDOW:
List<Object> readContents = new ArrayList<>();
- long i = 0;
- for (; i < windowInfo.windowSize(); i++) {
+ long advance = 0;
+ for (long i = 0; i < windowInfo.windowSize(); i++) {
if (iterator.hasNext()) {
- readContents.add(iterator.next());
+ advance++;
+
readContents.add(deserializer.deserialize(iterator.next()));
} else {
- break;
+ if (sourceMode == SourceMode.BATCH) {
+ break;
+ }
+ try {
+ removeRecordIterator(split);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ innerOffset = innerOffset.nextSplit();
+ Split seek =
paimonPartition.seek(innerOffset.getSplitIndex());
+ if (seek == null) {
+ // all split finished, try read next snapshot
+ innerOffset = innerOffset.nextSnapshot();
+
paimonPartition.reset(loadSplitsFrom(innerOffset.getSnapshotId()));
+ }
+ offsets.put(paimonPartition, innerOffset);
+ advance = 0L;
+ seek =
paimonPartition.seek(innerOffset.getSplitIndex());
+ if (seek == null) {
+ // no new snapshot discovered, retry next turn
+ break;
+ }
+ iterator = createRecordIterator(seek);
+ i--;
}
}
- long nextOffset = innerOffset.getOffset() + i;
- boolean isFinished = !iterator.hasNext();
- return FetchData.createStreamFetch(readContents, new
PaimonOffset(nextOffset), isFinished);
+ PaimonOffset next = innerOffset.advance(advance);
+ offsets.put(paimonPartition, next);
+ boolean isFinished = sourceMode != SourceMode.STREAM &&
!iterator.hasNext();
+ return FetchData.createStreamFetch(readContents, next,
isFinished);
default:
throw new GeaFlowDSLException("Paimon not support window:{}",
windowInfo.getType());
}
}
+ /**
+ * Create a paimon record iterator.
+ * @param split the paimon data split
+ * @return a paimon record iterator
+ * @throws IOException if error occurs
+ */
+ private CloseableIterator<InternalRow> createRecordIterator(Split split)
throws IOException {
+ CloseableIterator<InternalRow> iterator = iterators.get(split);
+ if (iterator != null) {
+ return iterator;
+ }
+ RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(split);
+ CloseableIterator<InternalRow> closeableIterator =
reader.toCloseableIterator();
+ readers.put(split, reader);
+ iterators.put(split, closeableIterator);
+ return closeableIterator;
+ }
+
+ /**
+ * Remove the paimon record iterator and reader.
+ * @param split the paimon data split
+ * @throws Exception if error occurs
+ */
+ private void removeRecordIterator(Split split) throws Exception {
+ CloseableIterator<InternalRow> removed = iterators.remove(split);
+ if (removed != null) {
+ removed.close();
+ }
+ RecordReader<InternalRow> reader = readers.remove(split);
+ if (reader != null) {
+ reader.close();
+ }
+ }
+
+ /**
+ * Load splits from snapshot.
+ * @param snapshotId the snapshot id
+ * @return all splits
+ */
+ private List<Split> loadSplitsFrom(long snapshotId) {
+ StreamTableScan streamTableScan = readBuilder.newStreamScan();
+ streamTableScan.restore(snapshotId);
+ long start = System.currentTimeMillis();
+ List<Split> splits = streamTableScan.plan().splits();
+ LOGGER.debug("Load splits from snapshot: {}, cost: {}ms", snapshotId,
System.currentTimeMillis() - start);
+ return splits;
+ }
+
+ /**
+ * Get paimon catalog.
+ * @return the paimon catalog.
+ */
+ private Catalog getPaimonCatalog() {
+ CatalogContext catalogContext;
+ if (StringUtils.isBlank(this.path)) {
+ if (StringUtils.isBlank(this.configJson)) {
+ catalogContext =
+ Objects.requireNonNull(CatalogContext.create(new
Options(options)));
+ } else {
+ org.apache.hadoop.conf.Configuration hadoopConf = new
org.apache.hadoop.conf.Configuration();
+ for (Map.Entry<String, String> entry : configs.entrySet()) {
+ hadoopConf.set(entry.getKey(), entry.getValue());
+ }
+ catalogContext =
+ Objects.requireNonNull(CatalogContext.create(new
Options(options), hadoopConf));
+ }
+ } else {
+ catalogContext = Objects.requireNonNull(CatalogContext.create(new
Path(path)));
+ }
+ return
Objects.requireNonNull(CatalogFactory.createCatalog(catalogContext));
+ }
+
@Override
public void close() {
- for (RecordReader reader : partition2Reader.values()) {
+ for (CloseableIterator<InternalRow> reader : iterators.values()) {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (Exception e) {
+ throw new GeaFlowDSLException("Error occurs when close
paimon iterator.", e);
+ }
+ }
+ }
+ for ( RecordReader<InternalRow> reader : readers.values()) {
if (reader != null) {
try {
reader.close();
- } catch (IOException e) {
+ } catch (Exception e) {
throw new GeaFlowDSLException("Error occurs when close
paimon reader.", e);
}
}
}
- partition2Reader.clear();
- partition2InnerOffset.clear();
+ iterators.clear();
+ offsets.clear();
}
public static class PaimonPartition implements Partition {
private final String database;
private final String table;
- private final Split split;
- public PaimonPartition(String database, String table, Split split) {
+ private int index;
+ private int parallel;
+
+ // current snapshot id
+ private transient long currentSnapshot;
+ // assigned splits for this partition
+ private final transient List<Split> splits;
+
+ public PaimonPartition(String database, String table) {
this.database = Objects.requireNonNull(database);
this.table = Objects.requireNonNull(table);
- this.split = Objects.requireNonNull(split);
+ this.splits = new ArrayList<>();
+ this.currentSnapshot = -1L;
+ this.index = 0;
+ this.parallel = 1;
+ }
+
+ public void reset(List<Split> splits) {
+ this.splits.clear();
+ for (Split split : splits) {
+ DataSplit dataSplit = (DataSplit) split;
+ int hash = Objects.hash(dataSplit.bucket(),
dataSplit.partition());
+ if (hash % parallel == index) {
+ this.splits.add(split);
+ this.currentSnapshot = dataSplit.snapshotId();
+ }
+ }
+ if (!this.splits.isEmpty()) {
+ LOGGER.info("Assign paimon split(s) for table {}.{}, snapshot:
{}, split size: {}",
+ database, table, this.currentSnapshot,
this.splits.size());
+ }
}
@Override
@@ -227,9 +378,39 @@ public class PaimonTableSource implements TableSource {
return database + "-" + table;
}
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public long getCurrentSnapshot() {
+ return currentSnapshot;
+ }
+
+ /**
+ * Seek the split by index.
+ * @param splitIndex the split index
+ * @return the split to read
+ */
+ public Split seek(int splitIndex) {
+ if (splitIndex >= splits.size()) {
+ return null;
+ }
+ return splits.get(splitIndex);
+ }
+
+ @Override
+ public void setIndex(int index, int parallel) {
+ this.parallel = parallel;
+ this.index = index;
+ }
+
@Override
public int hashCode() {
- return Objects.hash(database, table, split);
+ return Objects.hash(database, table, index);
}
@Override
@@ -242,44 +423,50 @@ public class PaimonTableSource implements TableSource {
}
PaimonPartition that = (PaimonPartition) o;
return Objects.equals(database, that.database) && Objects.equals(
- table, that.table) && Objects.equals(
- split, that.split);
- }
-
- public String getDatabase() {
- return database;
- }
-
- public String getTable() {
- return table;
- }
-
- public Split getSplit() {
- return split;
- }
-
- @Override
- public void setIndex(int index, int parallel) {
+ table, that.table) && Objects.equals(
+ index, that.index);
}
}
-
public static class PaimonOffset implements Offset {
+ // if partition snapshot id is not equal to current snapshot id,
+ // need to load splits
+ private final long snapshotId;
+ // the index of current partition assigned splits
+ private final int splitIndex;
+ // the offset of current split
private final long offset;
+ public static PaimonOffset from(PaimonOffset offset) {
+ return new PaimonOffset(offset.snapshotId, offset.splitIndex,
offset.offset);
+ }
public PaimonOffset() {
- this.offset = 0L;
+ this(1L);
+ }
+
+ public PaimonOffset(long snapshotId) {
+ this(snapshotId, 0, 0);
}
- public PaimonOffset(long offset) {
+ public PaimonOffset(long snapshotId, int splitIndex, long offset) {
+ this.snapshotId = snapshotId;
+ this.splitIndex = splitIndex;
this.offset = offset;
}
@Override
public String humanReadable() {
- return String.valueOf(offset);
+ return String.format("snapshot %d, split index %d, offset %d",
snapshotId, splitIndex, offset);
+ }
+
+ public long getSnapshotId() {
+ return snapshotId;
+ }
+
+ public int getSplitIndex() {
+ return splitIndex;
}
@Override
@@ -287,6 +474,21 @@ public class PaimonTableSource implements TableSource {
return offset;
}
+ public PaimonOffset advance(long rows) {
+ if (rows == 0) {
+ return this;
+ }
+ return new PaimonOffset(snapshotId, splitIndex, offset + rows);
+ }
+
+ public PaimonOffset nextSplit() {
+ return new PaimonOffset(snapshotId, splitIndex + 1, 0);
+ }
+
+ public PaimonOffset nextSnapshot() {
+ return new PaimonOffset(snapshotId + 1, 0, 0);
+ }
+
@Override
public boolean isTimestamp() {
return false;
@@ -301,12 +503,12 @@ public class PaimonTableSource implements TableSource {
return false;
}
PaimonOffset that = (PaimonOffset) o;
- return offset == that.offset;
+ return snapshotId == that.snapshotId && splitIndex ==
that.splitIndex && offset == that.offset;
}
@Override
public int hashCode() {
- return Objects.hash(offset);
+ return Objects.hash(snapshotId, splitIndex, offset);
}
}
}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/SourceMode.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/SourceMode.java
new file mode 100644
index 000000000..637de0b0e
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/SourceMode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+/** Paimon source mode. */
+public enum SourceMode {
+ BATCH,
+ STREAM
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/StartupMode.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/StartupMode.java
new file mode 100644
index 000000000..56182aab7
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/StartupMode.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.paimon;
+
+public enum StartupMode {
+ LATEST(
+ "latest",
+ "Read changes starting from the latest snapshot."),
+
+ FROM_SNAPSHOT(
+ "from-snapshot",
+ "For streaming sources, continuously reads changes starting from
snapshot "
+ + "specified by \"scan.snapshot-id\", without producing a
snapshot at the beginning. "
+ + "For batch sources, produces a snapshot specified by
\"scan.snapshot-id\" "
+ + "but does not read new changes.");
+
+ private final String value;
+ private final String description;
+
+ StartupMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java
index 84746b465..edd36ab51 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java
@@ -32,20 +32,21 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.config.keys.ConnectorConfigKeys;
+import org.apache.geaflow.common.tuple.Tuple;
import org.apache.geaflow.common.type.Types;
-import org.apache.geaflow.dsl.common.data.Row;
import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
import org.apache.geaflow.dsl.common.types.StructType;
import org.apache.geaflow.dsl.common.types.TableField;
import org.apache.geaflow.dsl.common.types.TableSchema;
import org.apache.geaflow.dsl.connector.api.FetchData;
+import org.apache.geaflow.dsl.connector.api.Offset;
import org.apache.geaflow.dsl.connector.api.Partition;
import org.apache.geaflow.dsl.connector.api.TableConnector;
import org.apache.geaflow.dsl.connector.api.TableReadableConnector;
import org.apache.geaflow.dsl.connector.api.TableSource;
-import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
import org.apache.geaflow.dsl.connector.api.util.ConnectorFactory;
import org.apache.geaflow.dsl.connector.api.window.AllFetchWindow;
+import org.apache.geaflow.dsl.connector.api.window.SizeFetchWindow;
import org.apache.geaflow.runtime.core.context.DefaultRuntimeContext;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
@@ -64,11 +65,28 @@ import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.VarCharType;
import org.testng.Assert;
+import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
+import org.testng.collections.Lists;
public class PaimonTableConnectorTest {
+ String tmpDir = "/tmp/geaflow/dsl/paimon/test/";
+ String db = "paimon_db";
+
+ GenericRow record1 = GenericRow.of(1, BinaryString.fromString("a1"), 10.0);
+ GenericRow record2 = GenericRow.of(2, BinaryString.fromString("ab"), 12.0);
+ GenericRow record3 = GenericRow.of(3, BinaryString.fromString("a3"), 12.0);
+ GenericRow record4 = GenericRow.of(4, BinaryString.fromString("bcd"),
15.0);
+ GenericRow record5 = GenericRow.of(5, BinaryString.fromString("a5"), 10.0);
+ GenericRow record6 = GenericRow.of(6, BinaryString.fromString("s1"), 9.0);
+ GenericRow record7 = GenericRow.of(7, BinaryString.fromString("sb"), 20.0);
+ GenericRow record8 = GenericRow.of(8, BinaryString.fromString("s3"), 16.0);
+ GenericRow record9 = GenericRow.of(9, BinaryString.fromString("bad"),
12.0);
+ GenericRow record10 = GenericRow.of(10, BinaryString.fromString("aa5"),
11.0);
+ GenericRow record11 = GenericRow.of(11, BinaryString.fromString("x11"),
11.2);
+
private final StructType dataSchema = new StructType(
new TableField("id", Types.INTEGER, false),
new TableField("name", Types.BINARY_STRING),
@@ -84,39 +102,37 @@ public class PaimonTableConnectorTest {
@BeforeTest
public void prepare() {
- String tmpDir = "/tmp/geaflow/dsl/paimon/test/";
FileUtils.deleteQuietly(new File(tmpDir));
- String db = "paimon_db";
- String tableName = "paimon_table";
+ }
+
+ @AfterTest
+ public void clean() {
+ FileUtils.deleteQuietly(new File(tmpDir));
+ }
+
+ public void createSnapshot(String tableName, List<GenericRow> rows) {
CatalogContext catalogContext =
- Objects.requireNonNull(CatalogContext.create(new Path(tmpDir)));
+ Objects.requireNonNull(CatalogContext.create(new
Path(tmpDir)));
Catalog catalog =
Objects.requireNonNull(CatalogFactory.createCatalog(catalogContext));
try {
- catalog.createDatabase(db, false);
+ catalog.createDatabase(db, true);
List<String> dbs = catalog.listDatabases();
assert dbs.get(0).equals(db);
Identifier identifier = new Identifier(db, tableName);
catalog.createTable(identifier,
- Schema.newBuilder()
- .column("id", new IntType())
- .column("name", new VarCharType(256))
- .column("price", new DoubleType())
- .build(), false);
+ Schema.newBuilder()
+ .column("id", new IntType())
+ .column("name", new VarCharType(256))
+ .column("price", new DoubleType())
+ .build(), true);
List<String> tables = catalog.listTables(dbs.get(0));
- assert tables.get(0).equals(tableName);
+ assert tables.contains(tableName);
Table table = catalog.getTable(identifier);
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
BatchTableWrite write = writeBuilder.newWrite();
- GenericRow record1 = GenericRow.of(1,
BinaryString.fromString("a1"), 10.0);
- GenericRow record2 = GenericRow.of(2,
BinaryString.fromString("ab"), 12.0);
- GenericRow record3 = GenericRow.of(3,
BinaryString.fromString("a3"), 12.0);
- GenericRow record4 = GenericRow.of(4,
BinaryString.fromString("bcd"), 15.0);
- GenericRow record5 = GenericRow.of(5,
BinaryString.fromString("a5"), 10.0);
- write.write(record1);
- write.write(record2);
- write.write(record3);
- write.write(record4);
- write.write(record5);
+ for (GenericRow row : rows) {
+ write.write(row);
+ }
List<CommitMessage> messages = write.prepareCommit();
BatchTableCommit commit = writeBuilder.newCommit();
commit.commit(messages);
@@ -126,43 +142,246 @@ public class PaimonTableConnectorTest {
}
@Test
- public void testReadPaimon() throws IOException {
- String tmpDir = "/tmp/geaflow/dsl/paimon/test/";
- String db = "paimon_db";
- String table = "paimon_table";
+ public void testReadPaimonStreamMode() throws IOException {
+ String table = "paimon_stream_table";
+
+ Tuple<TableSource, Configuration> tuple = createTableSource(table,
true);
+ TableSource tableSource = tuple.f0;
+ Configuration tableConf = tuple.f1;
+
+ tableSource.init(tableConf, tableSchema);
+
+ // create snapshot 1
+ createSnapshot(table, Lists.newArrayList(record1, record2, record3,
record4, record5));
+
+ tableSource.open(new DefaultRuntimeContext(tableConf));
+
+ List<Partition> partitions = tableSource.listPartitions(1);
+
+ Offset nextOffset = null;
+
+ List<Object> readRows = new ArrayList<>();
+ for (Partition partition : partitions) {
+ FetchData<Object> rows = tableSource.fetch(partition,
Optional.empty(), new SizeFetchWindow(0L, 4));
+ while (rows.getDataIterator().hasNext()) {
+ readRows.add(rows.getDataIterator().next());
+ }
+ Assert.assertFalse(rows.isFinish());
+ nextOffset = rows.getNextOffset();
+ }
+ Assert.assertEquals(StringUtils.join(readRows, "\n"),
+ "[1, a1, 10.0]\n"
+ + "[2, ab, 12.0]\n"
+ + "[3, a3, 12.0]\n"
+ + "[4, bcd, 15.0]");
+ readRows.clear();
+
+ // create snapshot 2
+ createSnapshot(table, Lists.newArrayList(record6, record7, record8,
record9, record10));
+
+ for (Partition partition : partitions) {
+ FetchData<Object> rows = tableSource.fetch(partition,
Optional.empty(), new SizeFetchWindow(1L, 4));
+ while (rows.getDataIterator().hasNext()) {
+ readRows.add(rows.getDataIterator().next());
+ }
+ Assert.assertFalse(rows.isFinish());
+ }
+ Assert.assertEquals(StringUtils.join(readRows, "\n"),
+ "[5, a5, 10.0]\n"
+ + "[6, s1, 9.0]\n"
+ + "[7, sb, 20.0]\n"
+ + "[8, s3, 16.0]");
+ readRows.clear();
+
+ for (Partition partition : partitions) {
+ FetchData<Object> rows = tableSource.fetch(partition,
Optional.empty(), new SizeFetchWindow(2L, 4));
+ while (rows.getDataIterator().hasNext()) {
+ readRows.add(rows.getDataIterator().next());
+ }
+ Assert.assertFalse(rows.isFinish());
+ }
+ Assert.assertEquals(StringUtils.join(readRows, "\n"),
+ "[9, bad, 12.0]\n"
+ + "[10, aa5, 11.0]");
+ readRows.clear();
+
+ // no new snapshot
+ for (int i = 0; i < 2; i++) {
+ for (Partition partition : partitions) {
+ FetchData<Object> rows = tableSource.fetch(partition,
Optional.empty(), new SizeFetchWindow(3L, 4));
+ while (rows.getDataIterator().hasNext()) {
+ readRows.add(rows.getDataIterator().next());
+ }
+ Assert.assertFalse(rows.isFinish());
+ Assert.assertTrue(readRows.isEmpty());
+ }
+ }
+
+ // create snapshot 3
+ createSnapshot(table, Lists.newArrayList(record11));
+ for (Partition partition : partitions) {
+ FetchData<Object> rows = tableSource.fetch(partition,
Optional.empty(), new SizeFetchWindow(3L, 4));
+ while (rows.getDataIterator().hasNext()) {
+ readRows.add(rows.getDataIterator().next());
+ }
+ Assert.assertFalse(rows.isFinish());
+ Assert.assertFalse(readRows.isEmpty());
+ }
+ Assert.assertEquals(StringUtils.join(readRows, "\n"),
+ "[11, x11, 11.2]");
+ readRows.clear();
+
+ // test restore from offset
+ for (Partition partition : partitions) {
+ FetchData<Object> rows = tableSource.fetch(partition,
Optional.of(nextOffset), new SizeFetchWindow(4L, 4));
+ while (rows.getDataIterator().hasNext()) {
+ readRows.add(rows.getDataIterator().next());
+ }
+ Assert.assertFalse(rows.isFinish());
+ }
+ Assert.assertEquals(StringUtils.join(readRows, "\n"),
+ "[5, a5, 10.0]\n"
+ + "[6, s1, 9.0]\n"
+ + "[7, sb, 20.0]\n"
+ + "[8, s3, 16.0]");
+ }
+
+ @Test
+ public void testReadPaimonBatchMode() throws IOException {
+ String table = "paimon_batch_table";
+
+ Tuple<TableSource, Configuration> tuple = createTableSource(table,
false);
+ TableSource tableSource = tuple.f0;
+ Configuration tableConf = tuple.f1;
+ tableSource.init(tableConf, tableSchema);
+
+ createSnapshot(table, Lists.newArrayList(record1, record2, record3,
record4, record5));
+
+ tableSource.open(new DefaultRuntimeContext(tableConf));
+
+ List<Partition> partitions = tableSource.listPartitions(1);
+
+ List<Object> readRows = new ArrayList<>();
+ for (Partition partition : partitions) {
+ FetchData<Object> rows = tableSource.fetch(partition,
Optional.empty(), new SizeFetchWindow(0L, 4));
+ while (rows.getDataIterator().hasNext()) {
+ readRows.add(rows.getDataIterator().next());
+ }
+ Assert.assertFalse(rows.isFinish());
+ }
+ Assert.assertEquals(StringUtils.join(readRows, "\n"),
+ "[1, a1, 10.0]\n"
+ + "[2, ab, 12.0]\n"
+ + "[3, a3, 12.0]\n"
+ + "[4, bcd, 15.0]");
+ readRows.clear();
+
+ createSnapshot(table, Lists.newArrayList(record6, record7, record8,
record9, record10));
+
+ for (Partition partition : partitions) {
+ FetchData<Object> rows = tableSource.fetch(partition,
Optional.empty(), new SizeFetchWindow(1L, 4));
+ while (rows.getDataIterator().hasNext()) {
+ readRows.add(rows.getDataIterator().next());
+ }
+ Assert.assertTrue(rows.isFinish());
+ }
+ Assert.assertEquals(StringUtils.join(readRows, "\n"),
+ "[5, a5, 10.0]");
+ readRows.clear();
+ }
+
+ @Test
+ public void testReadPaimonFromSnapshot() throws IOException {
+ String table = "paimon_batch_table_2";
TableConnector tableConnector =
ConnectorFactory.loadConnector("PAIMON");
Assert.assertEquals(tableConnector.getType().toLowerCase(Locale.ROOT),
"paimon");
TableReadableConnector readableConnector = (TableReadableConnector)
tableConnector;
-
- Map<String, String> tableConfMap = new HashMap<>();
- tableConfMap.put(ConnectorConfigKeys.GEAFLOW_DSL_FILE_PATH.getKey(),
tmpDir);
-
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_WAREHOUSE.getKey(),
tmpDir);
-
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_DATABASE_NAME.getKey(),
db);
-
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_TABLE_NAME.getKey(),
table);
+ Map<String, String> tableConfMap = getTableConf(table).getConfigMap();
+
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SOURCE_MODE.getKey(),
SourceMode.BATCH.name());
+
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SCAN_MODE.getKey(),
"From-Snapshot");
+
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SCAN_SNAPSHOT_ID.getKey(),
"1");
Configuration tableConf = new Configuration(tableConfMap);
TableSource tableSource = readableConnector.createSource(tableConf);
tableSource.init(tableConf, tableSchema);
+ createSnapshot(table, Lists.newArrayList(record1, record2, record3,
record4, record5));
+ createSnapshot(table, Lists.newArrayList(record6, record7, record8,
record9, record10));
+
tableSource.open(new DefaultRuntimeContext(tableConf));
- List<Partition> partitions = tableSource.listPartitions();
+ List<Partition> partitions = tableSource.listPartitions(1);
- TableDeserializer deserializer =
tableSource.getDeserializer(tableConf);
- deserializer.init(tableConf, tableSchema);
- List<Row> readRows = new ArrayList<>();
+ List<Object> readRows = new ArrayList<>();
for (Partition partition : partitions) {
- FetchData<Object> rows = tableSource.fetch(partition,
Optional.empty(), new AllFetchWindow(-1L));
+ FetchData<Object> rows = tableSource.fetch(partition,
Optional.empty(), new AllFetchWindow(0L));
while (rows.getDataIterator().hasNext()) {
-
readRows.addAll(deserializer.deserialize(rows.getDataIterator().next()));
+ readRows.add(rows.getDataIterator().next());
}
+ Assert.assertTrue(rows.isFinish());
}
Assert.assertEquals(StringUtils.join(readRows, "\n"),
- "[1, a1, 10.0]\n"
- + "[2, ab, 12.0]\n"
- + "[3, a3, 12.0]\n"
- + "[4, bcd, 15.0]\n"
- + "[5, a5, 10.0]");
+ "[1, a1, 10.0]\n"
+ + "[2, ab, 12.0]\n"
+ + "[3, a3, 12.0]\n"
+ + "[4, bcd, 15.0]\n"
+ + "[5, a5, 10.0]");
+ }
+
+ @Test
+ public void testReadPaimonFromLastesSnapshot() throws IOException {
+ String table = "paimon_batch_table_3";
+
+ TableConnector tableConnector =
ConnectorFactory.loadConnector("PAIMON");
+ Assert.assertEquals(tableConnector.getType().toLowerCase(Locale.ROOT),
"paimon");
+ TableReadableConnector readableConnector = (TableReadableConnector)
tableConnector;
+ Configuration tableConf = getTableConf(table);
+
tableConf.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SOURCE_MODE.getKey(),
SourceMode.BATCH.name());
+ TableSource tableSource = readableConnector.createSource(tableConf);
+ tableSource.init(tableConf, tableSchema);
+
+ createSnapshot(table, Lists.newArrayList(record1, record2, record3,
record4, record5));
+ createSnapshot(table, Lists.newArrayList(record6, record7, record8,
record9, record10));
+
+ tableSource.open(new DefaultRuntimeContext(tableConf));
+
+ List<Partition> partitions = tableSource.listPartitions(1);
+
+ List<Object> readRows = new ArrayList<>();
+ for (Partition partition : partitions) {
+ FetchData<Object> rows = tableSource.fetch(partition,
Optional.empty(), new AllFetchWindow(0L));
+ while (rows.getDataIterator().hasNext()) {
+ readRows.add(rows.getDataIterator().next());
+ }
+ Assert.assertTrue(rows.isFinish());
+ }
+ Assert.assertEquals(StringUtils.join(readRows, "\n"),
+ "[6, s1, 9.0]\n"
+ + "[7, sb, 20.0]\n"
+ + "[8, s3, 16.0]\n"
+ + "[9, bad, 12.0]\n"
+ + "[10, aa5, 11.0]");
+ }
+
+ private Configuration getTableConf(String table) {
+ Map<String, String> tableConfMap = new HashMap<>();
+ tableConfMap.put(ConnectorConfigKeys.GEAFLOW_DSL_FILE_PATH.getKey(),
tmpDir);
+
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_WAREHOUSE.getKey(),
tmpDir);
+
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_DATABASE_NAME.getKey(),
db);
+
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_TABLE_NAME.getKey(),
table);
+ return new Configuration(tableConfMap);
+ }
+
+ private Tuple<TableSource, Configuration> createTableSource(String table,
boolean streamMode) {
+ TableConnector tableConnector =
ConnectorFactory.loadConnector("PAIMON");
+ Assert.assertEquals(tableConnector.getType().toLowerCase(Locale.ROOT),
"paimon");
+ TableReadableConnector readableConnector = (TableReadableConnector)
tableConnector;
+ Map<String, String> tableConfMap = getTableConf(table).getConfigMap();
+ if (streamMode) {
+
tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_SOURCE_MODE.getKey(),
SourceMode.STREAM.name());
+ }
+ Configuration tableConf = new Configuration(tableConfMap);
+ return Tuple.of(readableConnector.createSource(tableConf), tableConf);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]