This is an automated email from the ASF dual-hosted git repository. qingwzhao pushed a commit to branch paimon-source-connector in repository https://gitbox.apache.org/repos/asf/geaflow.git
commit 7e32cdf0120001c9e50a4d91d3fe9f5e148010b5 Author: qingwen.zqw <[email protected]> AuthorDate: Mon Aug 11 14:44:30 2025 +0800 feat: add paimon source connector --- .../geaflow-dsl-connector-paimon/pom.xml | 44 +++ .../dsl/connector/paimon/PaimonConfigKeys.java | 52 ++++ .../connector/paimon/PaimonRecordDeserializer.java | 85 ++++++ .../dsl/connector/paimon/PaimonTableConnector.java | 39 +++ .../dsl/connector/paimon/PaimonTableSource.java | 312 +++++++++++++++++++++ ...apache.geaflow.dsl.connector.api.TableConnector | 19 ++ .../connector/paimon/PaimonTableConnectorTest.java | 168 +++++++++++ geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml | 1 + 8 files changed, 720 insertions(+) diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/pom.xml b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/pom.xml new file mode 100644 index 00000000..1e68e9d9 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/pom.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.geaflow</groupId> + <artifactId>geaflow-dsl-connector</artifactId> + <version>0.6.8-SNAPSHOT</version> + </parent> + <artifactId>geaflow-dsl-connector-paimon</artifactId> + <name>Archetype - geaflow-dsl-connector-paimon</name> + + <dependencies> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-bundle</artifactId> + <version>${paimon.version}</version> + </dependency> + <dependency> + <groupId>org.apache.geaflow</groupId> + <artifactId>geaflow-dsl-connector-api</artifactId> + </dependency> + </dependencies> +</project> diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java new file mode 100644 index 00000000..106ef026 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonConfigKeys.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.paimon; + +import org.apache.geaflow.common.config.ConfigKey; +import org.apache.geaflow.common.config.ConfigKeys; + +public class PaimonConfigKeys { + + public static final ConfigKey GEAFLOW_DSL_PAIMON_WAREHOUSE = ConfigKeys + .key("geaflow.dsl.paimon.warehouse") + .noDefaultValue() + .description("The warehouse path for paimon catalog creation."); + + public static final ConfigKey GEAFLOW_DSL_PAIMON_OPTIONS_JSON = ConfigKeys + .key("geaflow.dsl.paimon.options.json") + .noDefaultValue() + .description("The options json for paimon catalog creation."); + + public static final ConfigKey GEAFLOW_DSL_PAIMON_CONFIGURATION_JSON = ConfigKeys + .key("geaflow.dsl.paimon.configuration.json") + .noDefaultValue() + .description("The configuration json for paimon catalog creation."); + + public static final ConfigKey GEAFLOW_DSL_PAIMON_DATABASE_NAME = ConfigKeys + .key("geaflow.dsl.paimon.database.name") + .noDefaultValue() + .description("The database name for paimon table."); + + public static final ConfigKey GEAFLOW_DSL_PAIMON_TABLE_NAME = ConfigKeys + .key("geaflow.dsl.paimon.table.name") + .noDefaultValue() + .description("The paimon table name to read."); + +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java new file mode 100644 index 00000000..d38c360d --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonRecordDeserializer.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.paimon; + +import java.util.Collections; +import java.util.List; +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.common.type.Types; +import org.apache.geaflow.dsl.common.data.Row; +import org.apache.geaflow.dsl.common.data.impl.ObjectRow; +import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException; +import org.apache.geaflow.dsl.common.types.StructType; +import org.apache.geaflow.dsl.common.types.TableField; +import org.apache.geaflow.dsl.common.types.TableSchema; +import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer; +import org.apache.paimon.data.InternalRow; + +public class PaimonRecordDeserializer implements TableDeserializer<Object> { + + private StructType schema; + private TableSchema tableSchema; + + @Override + public void init(Configuration conf, StructType schema) { + this.tableSchema = (TableSchema) schema; + this.schema = this.tableSchema.getDataSchema(); + } + + @Override + public List<Row> deserialize(Object record) { + InternalRow internalRow = (InternalRow) record; + assert internalRow.getFieldCount() == schema.size(); + Object[] values = new Object[schema.size()]; + for (int i = 0; i < schema.size(); i++) { + TableField field = this.schema.getField(i); + switch (field.getType().getName()) { + case Types.TYPE_NAME_BOOLEAN: + values[i] = internalRow.getBoolean(i); + break; + case Types.TYPE_NAME_BYTE: + values[i] = internalRow.getByte(i); + break; + case Types.TYPE_NAME_DOUBLE: + values[i] = internalRow.getDouble(i); + break; + case Types.TYPE_NAME_FLOAT: + values[i] = internalRow.getFloat(i); + break; + case Types.TYPE_NAME_INTEGER: + values[i] = internalRow.getInt(i); + break; + case Types.TYPE_NAME_LONG: + values[i] = internalRow.getLong(i); + break; + case Types.TYPE_NAME_STRING: + values[i] = internalRow.getString(i); + break; + case Types.TYPE_NAME_BINARY_STRING: + values[i] = internalRow.getString(i); + break; + default: + throw new GeaFlowDSLException("Type: {} not support", + field.getType().getName()); + } + } + return Collections.singletonList(ObjectRow.create(values)); + } +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnector.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnector.java new file mode 100644 index 00000000..deda9383 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnector.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.paimon; + +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.dsl.connector.api.TableReadableConnector; +import org.apache.geaflow.dsl.connector.api.TableSource; + +public class PaimonTableConnector implements TableReadableConnector { + + private static final String PAIMON = "PAIMON"; + + @Override + public String getType() { + return PAIMON; + } + + @Override + public TableSource createSource(Configuration conf) { + return new PaimonTableSource(); + } +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java new file mode 100644 index 00000000..e54aee53 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableSource.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.paimon; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.commons.lang.StringUtils; +import org.apache.geaflow.api.context.RuntimeContext; +import org.apache.geaflow.api.window.WindowType; +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.common.config.keys.DSLConfigKeys; +import org.apache.geaflow.common.utils.GsonUtil; +import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException; +import org.apache.geaflow.dsl.common.types.TableSchema; +import org.apache.geaflow.dsl.common.util.Windows; +import org.apache.geaflow.dsl.connector.api.FetchData; +import org.apache.geaflow.dsl.connector.api.Offset; +import org.apache.geaflow.dsl.connector.api.Partition; +import org.apache.geaflow.dsl.connector.api.TableSource; +import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer; +import org.apache.geaflow.dsl.connector.api.window.FetchWindow; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Catalog.TableNotExistException; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.utils.CloseableIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PaimonTableSource implements TableSource { + + private static final Logger LOGGER = LoggerFactory.getLogger(PaimonTableSource.class); + + private Configuration tableConf; + private TableSchema tableSchema; + private boolean isAllWindow; + + private String path; + private Map<String, String> options; + private String configJson; + private Map<String, String> configs; + private String database; + private String tableName; + + private transient CatalogContext catalogContext; + private transient Catalog catalog; + private transient ReadBuilder readBuilder; + private transient Map<PaimonPartition, RecordReader<InternalRow>> partition2Reader; + private transient Map<PaimonPartition, PaimonOffset> partition2InnerOffset; + + @Override + public void init(Configuration tableConf, TableSchema tableSchema) { + this.tableConf = tableConf; + this.isAllWindow = tableConf.getLong(DSLConfigKeys.GEAFLOW_DSL_WINDOW_SIZE) == Windows.SIZE_OF_ALL_WINDOW; + this.tableSchema = tableSchema; + this.path = tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_WAREHOUSE, ""); + this.options = new HashMap<>(); + this.configs = new HashMap<>(); + if (StringUtils.isBlank(this.path)) { + String optionJson = tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_OPTIONS_JSON); + Map<String, String> userOptions = GsonUtil.parse(optionJson); + if (userOptions != null) { + for (Map.Entry<String, String> entry : userOptions.entrySet()) { + options.put(entry.getKey(), entry.getValue()); + } + } + this.configJson = + tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_CONFIGURATION_JSON, ""); + if (!StringUtils.isBlank(configJson)) { + Map<String, String> userConfig = GsonUtil.parse(configJson); + if (userConfig != null) { + for (Map.Entry<String, String> entry : userConfig.entrySet()) { + configs.put(entry.getKey(), entry.getValue()); + } + } + } + } + this.database = tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_DATABASE_NAME); + this.tableName = tableConf.getString(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_TABLE_NAME); + } + + @Override + public List<Partition> listPartitions(int parallelism) { + return listPartitions(); + } + + @Override + public void open(RuntimeContext context) { + if (StringUtils.isBlank(this.path)) { + if (StringUtils.isBlank(this.configJson)) { + this.catalogContext = + Objects.requireNonNull(CatalogContext.create(new Options(options))); + } else { + org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); + for (Map.Entry<String, String> entry : configs.entrySet()) { + hadoopConf.set(entry.getKey(), entry.getValue()); + } + this.catalogContext = + Objects.requireNonNull(CatalogContext.create(new Options(options), hadoopConf)); + } + } else { + this.catalogContext = Objects.requireNonNull(CatalogContext.create(new Path(path))); + } + this.catalog = Objects.requireNonNull(CatalogFactory.createCatalog(this.catalogContext)); + Identifier identifier = Identifier.create(database, tableName); + try { + this.readBuilder = Objects.requireNonNull(catalog.getTable(identifier).newReadBuilder()); + } catch (TableNotExistException e) { + throw new GeaFlowDSLException("Table: {} in db: {} not exists.", tableName, database); + } + this.partition2Reader = new HashMap<>(); + this.partition2InnerOffset = new HashMap<>(); + LOGGER.info("Open paimon source, tableConf: {}, tableSchema: {}, path: {}, options: " + + "{}, configs: {}, database: {}, tableName: {}", tableConf, tableSchema, path, + options, configs, database, tableName); + } + + @Override + public List<Partition> listPartitions() { + List<Split> splits = isAllWindow ? readBuilder.newScan().plan().splits() : + readBuilder.newStreamScan().plan().splits(); + return splits.stream().map(split -> new PaimonPartition(database, tableName, split)).collect(Collectors.toList()); + } + + @Override + public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) { + return (TableDeserializer<IN>) new PaimonRecordDeserializer(); + } + + @Override + public <T> FetchData fetch(Partition partition, Optional<Offset> startOffset, FetchWindow windowInfo) + throws IOException { + PaimonPartition paimonPartition = (PaimonPartition) partition; + assert paimonPartition.getDatabase().equals(this.database) + && paimonPartition.getTable().equals(this.tableName); + RecordReader reader = partition2Reader.getOrDefault(partition, + readBuilder.newRead().createReader(paimonPartition.getSplit())); + partition2Reader.put(paimonPartition, reader); + + PaimonOffset innerOffset = partition2InnerOffset.getOrDefault(partition, + new PaimonOffset()); + partition2InnerOffset.put(paimonPartition, innerOffset); + + if (startOffset.isPresent() && !startOffset.get().equals(innerOffset)) { + throw new GeaFlowDSLException("Paimon connector not support reset offset."); + } + CloseableIterator iterator = reader.toCloseableIterator(); + if (windowInfo.getType() == WindowType.ALL_WINDOW) { + return FetchData.createBatchFetch(iterator, new PaimonOffset()); + } else if (windowInfo.getType() == WindowType.SIZE_TUMBLING_WINDOW) { + List<Object> readContents = new ArrayList<>(); + long i = 0; + for (; i < windowInfo.windowSize(); i++) { + if (iterator.hasNext()) { + readContents.add(iterator.next()); + } else { + break; + } + } + long nextOffset = innerOffset.getOffset() + i; + boolean isFinished = !iterator.hasNext(); + return FetchData.createStreamFetch(readContents, new PaimonOffset(nextOffset), isFinished); + } else { + throw new GeaFlowDSLException("Paimon not support window:{}", windowInfo.getType()); + } + } + + @Override + public void close() { + for (RecordReader reader : partition2Reader.values()) { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + throw new GeaFlowDSLException("Error occurs when close paimon reader.", e); + } + } + } + partition2Reader.clear(); + partition2InnerOffset.clear(); + } + + public static class PaimonPartition implements Partition { + + private final String database; + private final String table; + private final Split split; + + public PaimonPartition(String database, String table, Split split) { + this.database = Objects.requireNonNull(database); + this.table = Objects.requireNonNull(table); + this.split = Objects.requireNonNull(split); + } + + @Override + public String getName() { + return database + "-" + table; + } + + @Override + public int hashCode() { + return Objects.hash(database, table, split); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PaimonPartition)) { + return false; + } + PaimonPartition that = (PaimonPartition) o; + return Objects.equals(database, that.database) && Objects.equals( + table, that.table) && Objects.equals( + split, that.split); + } + + public String getDatabase() { + return database; + } + + public String getTable() { + return table; + } + + public Split getSplit() { + return split; + } + + @Override + public void setIndex(int index, int parallel) { + } + } + + + public static class PaimonOffset implements Offset { + + private final long offset; + + + public PaimonOffset() { + this.offset = 0L; + } + + public PaimonOffset(long offset) { + this.offset = offset; + } + + @Override + public String humanReadable() { + return String.valueOf(offset); + } + + @Override + public long getOffset() { + return offset; + } + + @Override + public boolean isTimestamp() { + return false; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PaimonOffset that = (PaimonOffset) o; + return offset == that.offset; + } + + @Override + public int hashCode() { + return Objects.hash(offset); + } + } +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector new file mode 100644 index 00000000..5b7cbce0 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +org.apache.geaflow.dsl.connector.paimon.PaimonTableConnector \ No newline at end of file diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java new file mode 100644 index 00000000..84746b46 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-paimon/src/test/java/org/apache/geaflow/dsl/connector/paimon/PaimonTableConnectorTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.paimon; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.common.config.keys.ConnectorConfigKeys; +import org.apache.geaflow.common.type.Types; +import org.apache.geaflow.dsl.common.data.Row; +import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException; +import org.apache.geaflow.dsl.common.types.StructType; +import org.apache.geaflow.dsl.common.types.TableField; +import org.apache.geaflow.dsl.common.types.TableSchema; +import org.apache.geaflow.dsl.connector.api.FetchData; +import org.apache.geaflow.dsl.connector.api.Partition; +import org.apache.geaflow.dsl.connector.api.TableConnector; +import org.apache.geaflow.dsl.connector.api.TableReadableConnector; +import org.apache.geaflow.dsl.connector.api.TableSource; +import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer; +import org.apache.geaflow.dsl.connector.api.util.ConnectorFactory; +import org.apache.geaflow.dsl.connector.api.window.AllFetchWindow; +import org.apache.geaflow.runtime.core.context.DefaultRuntimeContext; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.VarCharType; +import org.testng.Assert; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +public class PaimonTableConnectorTest { + + private final StructType dataSchema = new StructType( + new TableField("id", Types.INTEGER, false), + new TableField("name", Types.BINARY_STRING), + new TableField("price", Types.DOUBLE) + ); + + private final StructType partitionSchema = new StructType( + new TableField("dt", Types.BINARY_STRING, false) + ); + + private final TableSchema tableSchema = new TableSchema(dataSchema, partitionSchema); + + + @BeforeTest + public void prepare() { + String tmpDir = "/tmp/geaflow/dsl/paimon/test/"; + FileUtils.deleteQuietly(new File(tmpDir)); + String db = "paimon_db"; + String tableName = "paimon_table"; + CatalogContext catalogContext = + Objects.requireNonNull(CatalogContext.create(new Path(tmpDir))); + Catalog catalog = Objects.requireNonNull(CatalogFactory.createCatalog(catalogContext)); + try { + catalog.createDatabase(db, false); + List<String> dbs = catalog.listDatabases(); + assert dbs.get(0).equals(db); + Identifier identifier = new Identifier(db, tableName); + catalog.createTable(identifier, + Schema.newBuilder() + .column("id", new IntType()) + .column("name", new VarCharType(256)) + .column("price", new DoubleType()) + .build(), false); + List<String> tables = catalog.listTables(dbs.get(0)); + assert tables.get(0).equals(tableName); + Table table = catalog.getTable(identifier); + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + BatchTableWrite write = writeBuilder.newWrite(); + GenericRow record1 = GenericRow.of(1, BinaryString.fromString("a1"), 10.0); + GenericRow record2 = GenericRow.of(2, BinaryString.fromString("ab"), 12.0); + GenericRow record3 = GenericRow.of(3, BinaryString.fromString("a3"), 12.0); + GenericRow record4 = GenericRow.of(4, BinaryString.fromString("bcd"), 15.0); + GenericRow record5 = GenericRow.of(5, BinaryString.fromString("a5"), 10.0); + write.write(record1); + write.write(record2); + write.write(record3); + write.write(record4); + write.write(record5); + List<CommitMessage> messages = write.prepareCommit(); + BatchTableCommit commit = writeBuilder.newCommit(); + commit.commit(messages); + } catch (Exception e) { + throw new GeaFlowDSLException("Test error.", e); + } + } + + @Test + public void testReadPaimon() throws IOException { + String tmpDir = "/tmp/geaflow/dsl/paimon/test/"; + String db = "paimon_db"; + String table = "paimon_table"; + + TableConnector tableConnector = ConnectorFactory.loadConnector("PAIMON"); + Assert.assertEquals(tableConnector.getType().toLowerCase(Locale.ROOT), "paimon"); + TableReadableConnector readableConnector = (TableReadableConnector) tableConnector; + + Map<String, String> tableConfMap = new HashMap<>(); + tableConfMap.put(ConnectorConfigKeys.GEAFLOW_DSL_FILE_PATH.getKey(), tmpDir); + tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_WAREHOUSE.getKey(), tmpDir); + tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_DATABASE_NAME.getKey(), db); + tableConfMap.put(PaimonConfigKeys.GEAFLOW_DSL_PAIMON_TABLE_NAME.getKey(), table); + Configuration tableConf = new Configuration(tableConfMap); + TableSource tableSource = readableConnector.createSource(tableConf); + tableSource.init(tableConf, tableSchema); + + tableSource.open(new DefaultRuntimeContext(tableConf)); + + List<Partition> partitions = tableSource.listPartitions(); + + TableDeserializer deserializer = tableSource.getDeserializer(tableConf); + deserializer.init(tableConf, tableSchema); + List<Row> readRows = new ArrayList<>(); + for (Partition partition : partitions) { + FetchData<Object> rows = tableSource.fetch(partition, Optional.empty(), new AllFetchWindow(-1L)); + while (rows.getDataIterator().hasNext()) { + readRows.addAll(deserializer.deserialize(rows.getDataIterator().next())); + } + } + Assert.assertEquals(StringUtils.join(readRows, "\n"), + "[1, a1, 10.0]\n" + + "[2, ab, 12.0]\n" + + "[3, a3, 12.0]\n" + + "[4, bcd, 15.0]\n" + + "[5, a5, 10.0]"); + } + +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml b/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml index 9ef2186d..b7c9821a 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/pom.xml @@ -46,6 +46,7 @@ <module>geaflow-dsl-connector-hbase</module> <module>geaflow-dsl-connector-pulsar</module> <module>geaflow-dsl-connector-random</module> + <module>geaflow-dsl-connector-paimon</module> </modules> <dependencyManagement> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
