This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 9cbb9ae7c [AMORO-3698] Remove Flink 1.15 modules (#3771)
9cbb9ae7c is described below
commit 9cbb9ae7c168ee0c45e274c595d4d4df0ec83fb1
Author: Yaguang Jia <[email protected]>
AuthorDate: Thu Sep 11 20:22:11 2025 +0800
[AMORO-3698] Remove Flink 1.15 modules (#3771)
---
README.md | 2 +-
amoro-format-mixed/amoro-mixed-flink/pom.xml | 2 -
.../v1.15/amoro-mixed-flink-1.15/pom.xml | 109 ------
.../lookup/MixedFormatRowDataLookupFunction.java | 81 -----
.../flink/table/MixedFormatDynamicSource.java | 384 ---------------------
.../v1.15/amoro-mixed-flink-runtime-1.15/pom.xml | 253 --------------
docs/_index.md | 12 +-
7 files changed, 7 insertions(+), 836 deletions(-)
diff --git a/README.md b/README.md
index 65f46df7b..6e1944bac 100644
--- a/README.md
+++ b/README.md
@@ -82,7 +82,7 @@ Amoro support multiple processing engines for Mixed format as
below:
| Processing Engine | Version | Batch Read | Batch Write |
Batch Overwrite | Streaming Read | Streaming Write | Create Table | Alter Table
|
|-------------------|------------------------|-------------|-------------|-----------------|----------------|-----------------|--------------|-------------|
-| Flink | 1.15.x, 1.16.x, 1.17.x | ✔ | ✔ |
✖ | ✔ | ✔ | ✔ | ✖
|
+| Flink | 1.16.x, 1.17.x, 1.18.x | ✔ | ✔ |
✖ | ✔ | ✔ | ✔ | ✖
|
| Spark | 3.1, 3.2, 3.3 | ✔ | ✔ |
✔ | ✖ | ✖ | ✔ | ✔
|
| Hive | 2.x, 3.x | ✔ | ✖ |
✔ | ✖ | ✖ | ✖ | ✔ |
| Trino | 406 | ✔ | ✖ |
✔ | ✖ | ✖ | ✖ | ✔ |
diff --git a/amoro-format-mixed/amoro-mixed-flink/pom.xml
b/amoro-format-mixed/amoro-mixed-flink/pom.xml
index 8f8980ec1..f9028321e 100644
--- a/amoro-format-mixed/amoro-mixed-flink/pom.xml
+++ b/amoro-format-mixed/amoro-mixed-flink/pom.xml
@@ -37,8 +37,6 @@
<module>amoro-mixed-flink-common</module>
<module>amoro-mixed-flink-common-format</module>
<module>amoro-mixed-flink-common-iceberg-bridge</module>
- <module>v1.15/amoro-mixed-flink-1.15</module>
- <module>v1.15/amoro-mixed-flink-runtime-1.15</module>
<module>v1.16/amoro-mixed-flink-1.16</module>
<module>v1.16/amoro-mixed-flink-runtime-1.16</module>
<module>v1.17/amoro-mixed-flink-1.17</module>
diff --git
a/amoro-format-mixed/amoro-mixed-flink/v1.15/amoro-mixed-flink-1.15/pom.xml
b/amoro-format-mixed/amoro-mixed-flink/v1.15/amoro-mixed-flink-1.15/pom.xml
deleted file mode 100644
index 78c876e20..000000000
--- a/amoro-format-mixed/amoro-mixed-flink/v1.15/amoro-mixed-flink-1.15/pom.xml
+++ /dev/null
@@ -1,109 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.amoro</groupId>
- <artifactId>amoro-mixed-flink</artifactId>
- <version>0.9-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <artifactId>amoro-format-mixed-flink-1.15</artifactId>
- <name>Amoro Project Mixed Format Flink 1.15</name>
- <url>https://amoro.apache.org</url>
-
- <packaging>jar</packaging>
-
- <properties>
- <iceberg.version>1.4.3</iceberg.version>
- <kafka.version>2.8.1</kafka.version>
- <assertj.version>3.21.0</assertj.version>
- <testcontainers.version>1.17.2</testcontainers.version>
- <flink.version>1.15.3</flink.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.amoro</groupId>
- <artifactId>amoro-mixed-flink-common</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.iceberg</groupId>
- <artifactId>iceberg-flink-1.15</artifactId>
- <version>${iceberg.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.parquet</groupId>
- <artifactId>parquet-column</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.parquet</groupId>
- <artifactId>parquet-avro</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.paimon</groupId>
- <artifactId>paimon-flink-1.15</artifactId>
- <version>${paimon.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <id>shade-amoro</id>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <artifactSet>
- <includes combine.children="append">
-
<include>org.apache.amoro:amoro-format-mixed-flink-common</include>
- </includes>
- </artifactSet>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git
a/amoro-format-mixed/amoro-mixed-flink/v1.15/amoro-mixed-flink-1.15/src/main/java/org/apache/amoro/flink/lookup/MixedFormatRowDataLookupFunction.java
b/amoro-format-mixed/amoro-mixed-flink/v1.15/amoro-mixed-flink-1.15/src/main/java/org/apache/amoro/flink/lookup/MixedFormatRowDataLookupFunction.java
deleted file mode 100644
index 1a508d930..000000000
---
a/amoro-format-mixed/amoro-mixed-flink/v1.15/amoro-mixed-flink-1.15/src/main/java/org/apache/amoro/flink/lookup/MixedFormatRowDataLookupFunction.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.flink.lookup;
-
-import org.apache.amoro.flink.read.hybrid.reader.DataIteratorReaderFunction;
-import org.apache.amoro.flink.table.MixedFormatTableLoader;
-import org.apache.amoro.hive.io.reader.AbstractAdaptHiveKeyedDataReader;
-import org.apache.amoro.table.MixedTable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.functions.FunctionContext;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.expressions.Expression;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.function.Predicate;
-
-/** A lookup function for {@link RowData} type. */
-public class MixedFormatRowDataLookupFunction extends TableFunction<RowData> {
- private static final long serialVersionUID = -7694050999266540499L;
- private final BasicLookupFunction<RowData> basicLookupFunction;
-
- public MixedFormatRowDataLookupFunction(
- TableFactory<RowData> tableFactory,
- MixedTable mixedTable,
- List<String> joinKeys,
- Schema projectSchema,
- List<Expression> filters,
- MixedFormatTableLoader tableLoader,
- Configuration config,
- Predicate<RowData> predicate,
- AbstractAdaptHiveKeyedDataReader<RowData> flinkMORDataReader,
- DataIteratorReaderFunction<RowData> readerFunction) {
- this.basicLookupFunction =
- new BasicLookupFunction<>(
- tableFactory,
- mixedTable,
- joinKeys,
- projectSchema,
- filters,
- tableLoader,
- config,
- predicate,
- flinkMORDataReader,
- readerFunction);
- }
-
- @Override
- public void open(FunctionContext context) throws IOException {
- basicLookupFunction.open(context);
- }
-
- public void eval(Object... rowKey) throws IOException {
- List<RowData> results =
basicLookupFunction.lookup(GenericRowData.of(rowKey));
- results.forEach(this::collect);
- }
-
- @Override
- public void close() throws Exception {
- basicLookupFunction.close();
- }
-}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/v1.15/amoro-mixed-flink-1.15/src/main/java/org/apache/amoro/flink/table/MixedFormatDynamicSource.java
b/amoro-format-mixed/amoro-mixed-flink/v1.15/amoro-mixed-flink-1.15/src/main/java/org/apache/amoro/flink/table/MixedFormatDynamicSource.java
deleted file mode 100644
index 4574c2121..000000000
---
a/amoro-format-mixed/amoro-mixed-flink/v1.15/amoro-mixed-flink-1.15/src/main/java/org/apache/amoro/flink/table/MixedFormatDynamicSource.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.flink.table;
-
-import org.apache.amoro.flink.lookup.KVTableFactory;
-import org.apache.amoro.flink.lookup.MixedFormatRowDataLookupFunction;
-import org.apache.amoro.flink.lookup.filter.RowDataPredicate;
-import org.apache.amoro.flink.lookup.filter.RowDataPredicateExpressionVisitor;
-import org.apache.amoro.flink.read.hybrid.reader.DataIteratorReaderFunction;
-import org.apache.amoro.flink.read.hybrid.reader.RowDataReaderFunction;
-import org.apache.amoro.flink.read.source.FlinkKeyedMORDataReader;
-import org.apache.amoro.flink.util.FilterUtil;
-import org.apache.amoro.flink.util.IcebergAndFlinkFilters;
-import org.apache.amoro.hive.io.reader.AbstractAdaptHiveKeyedDataReader;
-import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
-import org.apache.amoro.table.MixedTable;
-import org.apache.amoro.utils.SchemaUtil;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.DataStreamScanProvider;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.LookupTableSource;
-import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
-import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
-import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
-import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
-import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.expressions.CallExpression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.FunctionIdentifier;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.flink.data.RowDataUtil;
-import org.apache.iceberg.types.Type;
-import org.apache.iceberg.types.Types;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.BiFunction;
-import java.util.stream.Collectors;
-
-/** Flink table api that generates source operators. */
-public class MixedFormatDynamicSource
- implements ScanTableSource,
- SupportsFilterPushDown,
- SupportsProjectionPushDown,
- SupportsLimitPushDown,
- SupportsWatermarkPushDown,
- LookupTableSource {
-
- private static final Logger LOG =
LoggerFactory.getLogger(MixedFormatDynamicSource.class);
-
- protected final String tableName;
-
- protected final ScanTableSource mixedFormatDynamicSource;
- protected final MixedTable mixedTable;
- protected final Map<String, String> properties;
-
- protected int[] projectFields;
- protected List<Expression> filters;
- protected ResolvedExpression flinkExpression;
- protected final MixedFormatTableLoader tableLoader;
-
- @Nullable protected WatermarkStrategy<RowData> watermarkStrategy;
-
- /**
- * @param tableName tableName
- * @param mixedFormatDynamicSource underlying source
- * @param mixedTable mixedTable
- * @param properties With all mixed-format table properties and sql options
- * @param tableLoader
- */
- public MixedFormatDynamicSource(
- String tableName,
- ScanTableSource mixedFormatDynamicSource,
- MixedTable mixedTable,
- Map<String, String> properties,
- MixedFormatTableLoader tableLoader) {
- this.tableName = tableName;
- this.mixedFormatDynamicSource = mixedFormatDynamicSource;
- this.mixedTable = mixedTable;
- this.properties = properties;
- this.tableLoader = tableLoader;
- }
-
- public MixedFormatDynamicSource(
- String tableName,
- ScanTableSource mixedFormatDynamicSource,
- MixedTable mixedTable,
- Map<String, String> properties,
- MixedFormatTableLoader tableLoader,
- int[] projectFields,
- List<Expression> filters,
- ResolvedExpression flinkExpression) {
- this.tableName = tableName;
- this.mixedFormatDynamicSource = mixedFormatDynamicSource;
- this.mixedTable = mixedTable;
- this.properties = properties;
- this.tableLoader = tableLoader;
- this.projectFields = projectFields;
- this.filters = filters;
- this.flinkExpression = flinkExpression;
- }
-
- @Override
- public ChangelogMode getChangelogMode() {
- return mixedFormatDynamicSource.getChangelogMode();
- }
-
- @Override
- public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
- ScanRuntimeProvider origin =
mixedFormatDynamicSource.getScanRuntimeProvider(scanContext);
- Preconditions.checkArgument(
- origin instanceof DataStreamScanProvider,
- "file or log ScanRuntimeProvider should be DataStreamScanProvider, but
provided is "
- + origin.getClass());
- return origin;
- }
-
- @Override
- public DynamicTableSource copy() {
- return new MixedFormatDynamicSource(
- tableName,
- mixedFormatDynamicSource,
- mixedTable,
- properties,
- tableLoader,
- projectFields,
- filters,
- flinkExpression);
- }
-
- @Override
- public String asSummaryString() {
- return "Mixed-format Dynamic Source";
- }
-
- @Override
- public Result applyFilters(List<ResolvedExpression> filters) {
- IcebergAndFlinkFilters icebergAndFlinkFilters =
- FilterUtil.convertFlinkExpressToIceberg(filters);
- this.filters = icebergAndFlinkFilters.expressions();
-
- if (filters.size() == 1) {
- flinkExpression = filters.get(0);
- } else if (filters.size() >= 2) {
- flinkExpression = and(filters.get(0), filters.get(1));
- for (int i = 2; i < filters.size(); i++) {
- flinkExpression = and(flinkExpression, filters.subList(i, i +
1).get(0));
- }
- }
-
- if (mixedFormatDynamicSource instanceof SupportsFilterPushDown) {
- return ((SupportsFilterPushDown)
mixedFormatDynamicSource).applyFilters(filters);
- } else {
- return Result.of(Collections.emptyList(), filters);
- }
- }
-
- @Override
- public boolean supportsNestedProjection() {
- if (mixedFormatDynamicSource instanceof SupportsProjectionPushDown) {
- return ((SupportsProjectionPushDown)
mixedFormatDynamicSource).supportsNestedProjection();
- } else {
- return false;
- }
- }
-
- protected CallExpression and(ResolvedExpression left, ResolvedExpression
right) {
- return CallExpression.permanent(
- FunctionIdentifier.of(BuiltInFunctionDefinitions.AND.getName()),
- BuiltInFunctionDefinitions.AND,
- Arrays.asList(left, right),
- DataTypes.BOOLEAN());
- }
-
- @Override
- public void applyProjection(int[][] projectedFields, DataType
producedDataType) {
- projectFields = new int[projectedFields.length];
- for (int i = 0; i < projectedFields.length; i++) {
- Preconditions.checkArgument(
- projectedFields[i].length == 1, "Don't support nested projection
now.");
- projectFields[i] = projectedFields[i][0];
- }
-
- if (mixedFormatDynamicSource instanceof SupportsProjectionPushDown) {
- ((SupportsProjectionPushDown) mixedFormatDynamicSource)
- .applyProjection(projectedFields, producedDataType);
- }
- }
-
- @Override
- public void applyLimit(long newLimit) {
- if (mixedFormatDynamicSource instanceof SupportsLimitPushDown) {
- ((SupportsLimitPushDown) mixedFormatDynamicSource).applyLimit(newLimit);
- }
- }
-
- @Override
- public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
- if (mixedFormatDynamicSource instanceof SupportsWatermarkPushDown) {
- ((SupportsWatermarkPushDown)
mixedFormatDynamicSource).applyWatermark(watermarkStrategy);
- }
- }
-
- @Override
- public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context)
{
- int[] joinKeys = new int[context.getKeys().length];
- for (int i = 0; i < context.getKeys().length; i++) {
- Preconditions.checkArgument(
- context.getKeys()[i].length == 1,
- "Mixed-format lookup join doesn't support the row field as a joining
key.");
- joinKeys[i] = context.getKeys()[i][0];
- }
-
- return TableFunctionProvider.of(getLookupFunction(joinKeys));
- }
-
- protected TableFunction<RowData> getLookupFunction(int[] joinKeys) {
- Schema projectedSchema = getProjectedSchema();
-
- List<String> joinKeyNames = getJoinKeyNames(joinKeys, projectedSchema);
-
- Configuration config = new Configuration();
- properties.forEach(config::setString);
-
- Optional<RowDataPredicate> rowDataPredicate =
- generatePredicate(projectedSchema, flinkExpression);
-
- AbstractAdaptHiveKeyedDataReader<RowData> flinkArcticMORDataReader =
- generateMORReader(mixedTable, projectedSchema);
- DataIteratorReaderFunction<RowData> readerFunction =
- generateReaderFunction(mixedTable, projectedSchema);
-
- return new MixedFormatRowDataLookupFunction(
- KVTableFactory.INSTANCE,
- mixedTable,
- joinKeyNames,
- projectedSchema,
- filters,
- tableLoader,
- config,
- rowDataPredicate.orElse(null),
- flinkArcticMORDataReader,
- readerFunction);
- }
-
- protected DataIteratorReaderFunction<RowData> generateReaderFunction(
- MixedTable mixedTable, Schema projectedSchema) {
- return new RowDataReaderFunction(
- new Configuration(),
- mixedTable.schema(),
- projectedSchema,
- mixedTable.asKeyedTable().primaryKeySpec(),
- null,
- true,
- mixedTable.io(),
- true);
- }
-
- protected AbstractAdaptHiveKeyedDataReader<RowData> generateMORReader(
- MixedTable mixedTable, Schema projectedSchema) {
- BiFunction<Type, Object, Object> convertConstant = new ConvertTask();
-
- return new FlinkKeyedMORDataReader(
- mixedTable.io(),
- mixedTable.schema(),
- projectedSchema,
- mixedTable.asKeyedTable().primaryKeySpec(),
- null,
- true,
- convertConstant,
- true);
- }
-
- static class ConvertTask implements BiFunction<Type, Object, Object>,
Serializable {
- private static final long serialVersionUID = 4607513893568225789L;
-
- @Override
- public Object apply(Type t, Object u) {
- return RowDataUtil.convertConstant(t, u);
- }
- }
-
- protected List<String> getJoinKeyNames(int[] joinKeys, Schema
projectedSchema) {
- return Arrays.stream(joinKeys)
- .mapToObj(index -> projectedSchema.columns().get(index).name())
- .collect(Collectors.toList());
- }
-
- protected Schema getProjectedSchema() {
- Schema mixedFormatTableSchema = mixedTable.schema();
- Schema projectedSchema;
- if (projectFields == null) {
- LOG.info("The projected fields is null.");
- projectedSchema = mixedTable.schema();
- } else {
- if (mixedTable.isUnkeyedTable()) {
- throw new UnsupportedOperationException("Unkeyed table doesn't support
lookup join.");
- }
- List<String> primaryKeys =
mixedTable.asKeyedTable().primaryKeySpec().fieldNames();
- List<Integer> projectFieldList =
- Arrays.stream(projectFields).boxed().collect(Collectors.toList());
- List<Types.NestedField> columns = mixedFormatTableSchema.columns();
- for (int i = 0; i < mixedFormatTableSchema.columns().size(); i++) {
- if (primaryKeys.contains(columns.get(i).name()) &&
!projectFieldList.contains(i)) {
- projectFieldList.add(i);
- LOG.info(
- "Add identifier field {} to projected schema, due to this field
is mismatched.",
- columns.get(i).name());
- }
- }
-
- List<String> projectedFieldNames =
- projectFieldList.stream()
- .map(index -> columns.get(index).name())
- .collect(Collectors.toList());
- projectedSchema = SchemaUtil.selectInOrder(mixedFormatTableSchema,
projectedFieldNames);
- LOG.info("The projected schema {}.\n table schema {}.", projectedSchema,
mixedTable.schema());
- }
- return projectedSchema;
- }
-
- protected Optional<RowDataPredicate> generatePredicate(
- final Schema projectedSchema, final ResolvedExpression flinkExpression) {
- if (flinkExpression == null) {
- return Optional.empty();
- }
-
- final Map<String, Integer> fieldIndexMap = new HashMap<>();
- final Map<String, DataType> fieldDataTypeMap = new HashMap<>();
- List<Types.NestedField> fields = projectedSchema.asStruct().fields();
- for (int i = 0; i < fields.size(); i++) {
- Types.NestedField field = fields.get(i);
- fieldIndexMap.put(field.name(), i);
- fieldDataTypeMap.put(
- field.name(),
-
TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(field.type())));
- }
-
- RowDataPredicateExpressionVisitor visitor =
- generateExpressionVisitor(fieldIndexMap, fieldDataTypeMap);
- return flinkExpression.accept(visitor);
- }
-
- protected RowDataPredicateExpressionVisitor generateExpressionVisitor(
- Map<String, Integer> fieldIndexMap, Map<String, DataType>
fieldDataTypeMap) {
- return new RowDataPredicateExpressionVisitor(fieldIndexMap,
fieldDataTypeMap);
- }
-}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/v1.15/amoro-mixed-flink-runtime-1.15/pom.xml
b/amoro-format-mixed/amoro-mixed-flink/v1.15/amoro-mixed-flink-runtime-1.15/pom.xml
deleted file mode 100644
index 9da280767..000000000
---
a/amoro-format-mixed/amoro-mixed-flink/v1.15/amoro-mixed-flink-runtime-1.15/pom.xml
+++ /dev/null
@@ -1,253 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.amoro</groupId>
- <artifactId>amoro-mixed-flink</artifactId>
- <version>0.9-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <artifactId>amoro-format-mixed-flink-runtime-1.15</artifactId>
- <name>Amoro Project Mixed Format Flink 1.15 Runtime</name>
- <url>https://amoro.apache.org</url>
-
- <properties>
- <iceberg.version>1.4.3</iceberg.version>
- <flink.version>1.15.3</flink.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.amoro</groupId>
- <artifactId>amoro-format-mixed-flink-1.15</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka</artifactId>
- <version>${flink.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.github.luben</groupId>
- <artifactId>zstd-jni</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${kafka.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.github.luben</groupId>
- <artifactId>zstd-jni</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <id>shade-amoro</id>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
-
<createDependencyReducedPom>false</createDependencyReducedPom>
- <artifactSet>
- <includes combine.children="append">
- <include>org.apache.amoro:*</include>
- <include>org.apache.iceberg:*</include>
-
<include>com.fasterxml.jackson.core:*</include>
- <include>org.apache.parquet:*</include>
- <include>org.apache.commons:*</include>
- <include>commons-lang:*</include>
-
<include>com.github.ben-manes.caffeine:*</include>
- <include>org.apache.avro:*</include>
- <include>org.apache.orc:*</include>
- <include>io.airlift:*</include>
- <include>commons-collections:*</include>
- <include>cglib:*</include>
- <include>com.google.guava:*</include>
- <include>asm:*</include>
-
<include>org.apache.httpcomponents.client5:*</include>
-
<include>org.apache.httpcomponents.core5:*</include>
-
<include>org.apache.flink:flink-connector-kafka</include>
- <include>org.apache.kafka:*</include>
- <include>com.github.luben:*</include>
- </includes>
- </artifactSet>
-
- <relocations>
- <relocation>
- <pattern>org.apache.iceberg</pattern>
-
<shadedPattern>org.apache.amoro.shade.org.apache.iceberg</shadedPattern>
- <excludes>
-
<exclude>org.apache.iceberg.mr.hive.*</exclude>
- </excludes>
- </relocation>
-
- <relocation>
- <pattern>org.apache.parquet</pattern>
-
<shadedPattern>org.apache.amoro.shade.org.apache.parquet</shadedPattern>
- </relocation>
-
- <relocation>
- <pattern>org.apache.commons</pattern>
-
<shadedPattern>org.apache.amoro.shade.org.apache.commons
- </shadedPattern>
- </relocation>
-
- <relocation>
- <pattern>org.apache.avro</pattern>
-
<shadedPattern>org.apache.amoro.shade.org.apache.avro</shadedPattern>
- </relocation>
-
- <relocation>
- <pattern>org.apache.orc</pattern>
-
<shadedPattern>org.apache.amoro.shade.org.apache.orc</shadedPattern>
- </relocation>
-
- <relocation>
- <pattern>org.apache.hc</pattern>
-
<shadedPattern>org.apache.amoro.shade.org.apache.hc</shadedPattern>
- </relocation>
-
- <relocation>
- <pattern>org.apache.jute</pattern>
-
<shadedPattern>org.apache.amoro.shade.org.apache.jute</shadedPattern>
- </relocation>
-
- <relocation>
- <pattern>org.apache.kafka</pattern>
-
<shadedPattern>org.apache.amoro.shade.org.apache.kafka</shadedPattern>
- </relocation>
-
- <relocation>
- <pattern>shaded.parquet</pattern>
-
<shadedPattern>org.apache.amoro.shade.shaded.parquet</shadedPattern>
- </relocation>
-
- <relocation>
- <pattern>com.fasterxml</pattern>
-
<shadedPattern>org.apache.amoro.shade.com.fasterxml</shadedPattern>
- </relocation>
-
- <relocation>
-
<pattern>com.github.benmanes.caffeine</pattern>
- <shadedPattern>
-
org.apache.amoro.shade.com.github.benmanes.caffeine
- </shadedPattern>
- </relocation>
-
- <relocation>
- <pattern>org.threeten.extra</pattern>
-
<shadedPattern>org.apache.amoro.shade.org.threeten.extra</shadedPattern>
- </relocation>
-
- <relocation>
- <pattern>net.sf.cglib</pattern>
-
<shadedPattern>org.apache.amoro.shade.net.sf.cglib</shadedPattern>
- </relocation>
-
- <relocation>
- <pattern>com.google</pattern>
-
<shadedPattern>org.apache.amoro.shade.com.google</shadedPattern>
- </relocation>
-
- <relocation>
- <pattern>org.objectweb.asm</pattern>
-
<shadedPattern>org.apache.amoro.shade.org.objectweb.asm</shadedPattern>
- </relocation>
-
- <relocation>
- <pattern>com.facebook.fb303</pattern>
-
<shadedPattern>org.apache.amoro.shade.com.facebook.fb303</shadedPattern>
- </relocation>
-
- <relocation>
- <pattern>io.airlift</pattern>
-
<shadedPattern>org.apache.amoro.shade.io.airlift</shadedPattern>
- </relocation>
-
- <!-- flink-sql-connector-kafka -->
- <relocation>
-
<pattern>org.apache.flink.connector.kafka</pattern>
- <shadedPattern>
-
org.apache.amoro.shade.org.apache.flink.connector.kafka
- </shadedPattern>
- </relocation>
- <relocation>
-
<pattern>org.apache.flink.streaming.connectors.kafka</pattern>
- <shadedPattern>
-
org.apache.amoro.shade.org.apache.flink.streaming.connectors.kafka
- </shadedPattern>
- </relocation>
- <relocation>
- <pattern>
-
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
- </pattern>
- <shadedPattern>
-
org.apache.amoro.shade.org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
- </shadedPattern>
- </relocation>
- <relocation>
- <pattern>
-
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema
- </pattern>
- <shadedPattern>
-
org.apache.amoro.shade.org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema
- </shadedPattern>
- </relocation>
- <relocation>
- <pattern>
-
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema
- </pattern>
- <shadedPattern>
-
org.apache.amoro.shade.org.apache.flink.streaming.util.serialization.KeyedSerializationSchema
- </shadedPattern>
- </relocation>
- <relocation>
- <pattern>
-
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema
- </pattern>
- <shadedPattern>
-
org.apache.amoro.shade.org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema
- </shadedPattern>
- </relocation>
- </relocations>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/docs/_index.md b/docs/_index.md
index b219e33d1..1e23abd36 100644
--- a/docs/_index.md
+++ b/docs/_index.md
@@ -67,12 +67,12 @@ For details, please refer to: [Paimon
Docs](https://paimon.apache.org/docs/maste
Amoro support multiple processing engines for Mixed format as below:
-| Processing Engine | Version | Batch Read | Batch Write |
Batch Overwrite | Streaming Read | Streaming Write | Create Table | Alter Table
|
-|-------------------|---------------------------|-------------|-------------|-----------------|----------------|-----------------|--------------|-------------|
-| Flink | 1.15.x, 1.16.x and 1.17.x | ✔ | ✔ |
✖ | ✔ | ✔ | ✔ |
✖ |
-| Spark | 3.2, 3.3, 3.5 | ✔ | ✔ |
✔ | ✖ | ✖ | ✔ |
✔ |
-| Hive | 2.x, 3.x | ✔ | ✖ |
✔ | ✖ | ✖ | ✖ | ✔ |
-| Trino | 406 | ✔ | ✖ |
✔ | ✖ | ✖ | ✖ | ✔ |
+| Processing Engine | Version | Batch Read | Batch Write |
Batch Overwrite | Streaming Read | Streaming Write | Create Table | Alter Table
|
+|-------------------|------------------------|-------------|-------------|-----------------|----------------|-----------------|--------------|-------------|
+| Flink | 1.16.x, 1.17.x, 1.18.x | ✔ | ✔ |
✖ | ✔ | ✔ | ✔ | ✖
|
+| Spark | 3.2, 3.3, 3.5 | ✔ | ✔ |
✔ | ✖ | ✖ | ✔ | ✔
|
+| Hive | 2.x, 3.x | ✔ | ✖ |
✔ | ✖ | ✖ | ✖ | ✔ |
+| Trino | 406 | ✔ | ✖ |
✔ | ✖ | ✖ | ✖ | ✔ |
## User cases