This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new a3576d9e9 [FLINK-38726][fluss] Bump Fluss version to 0.9.0-incubating
a3576d9e9 is described below
commit a3576d9e9f9d173278239868ae829d9aca7a65f0
Author: Leonard Xu <[email protected]>
AuthorDate: Fri Mar 6 11:11:55 2026 +0800
[FLINK-38726][fluss] Bump Fluss version to 0.9.0-incubating
This closes #4180.
---
.../flink-cdc-pipeline-connector-fluss/pom.xml | 16 +-
.../fluss/factory/FlussDataSinkFactory.java | 4 +-
.../cdc/connectors/fluss/sink/FlussDataSink.java | 2 +-
.../fluss/sink/FlussEventSerializationSchema.java | 17 +-
.../fluss/sink/FlussMetaDataApplier.java | 16 +-
.../connectors/fluss/sink/row/CdcAsFlussArray.java | 175 +++++++++++++++++++++
.../CdcAsFlussMap.java} | 39 +++--
.../fluss/sink/{ => row}/CdcAsFlussRow.java | 30 +++-
.../cdc/connectors/fluss/sink/v2/FlussEvent.java | 2 +-
.../fluss/sink/v2/FlussEventSerializer.java | 2 +-
.../connectors/fluss/sink/v2/FlussRowWithOp.java | 4 +-
.../cdc/connectors/fluss/sink/v2/FlussSink.java | 2 +-
.../connectors/fluss/sink/v2/FlussSinkWriter.java | 24 +--
.../fluss/sink/v2/metrics/WrappedFlussCounter.java | 4 +-
.../fluss/sink/v2/metrics/WrappedFlussGauge.java | 4 +-
.../sink/v2/metrics/WrapperFlussHistogram.java | 8 +-
.../fluss/sink/v2/metrics/WrapperFlussMeter.java | 4 +-
.../v2/metrics/WrapperFlussMetricRegistry.java | 16 +-
.../connectors/fluss/utils/FlussConversions.java | 109 +++++++------
.../cdc/connectors/fluss/FlussPipelineITCase.java | 26 +--
.../sink/FlussEventSerializationSchemaTest.java | 9 +-
.../fluss/sink/FlussMetadataApplierTest.java | 75 ++++-----
.../connectors/fluss/sink/v2/FlussSinkITCase.java | 12 +-
.../fluss/utils/FlussConversionsTest.java | 82 +++++-----
.../flink-cdc-pipeline-e2e-tests/pom.xml | 17 +-
.../flink/cdc/pipeline/tests/FlussE2eITCase.java | 8 +-
26 files changed, 458 insertions(+), 249 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/pom.xml
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/pom.xml
index 2634c7aa2..816d99fd5 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/pom.xml
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/pom.xml
@@ -33,12 +33,12 @@ limitations under the License.
<properties>
- <fluss.version>0.7.0</fluss.version>
+ <fluss.version>0.9.0-incubating</fluss.version>
</properties>
<dependencies>
<dependency>
- <groupId>com.alibaba.fluss</groupId>
+ <groupId>org.apache.fluss</groupId>
<artifactId>fluss-client</artifactId>
<version>${fluss.version}</version>
</dependency>
@@ -66,34 +66,34 @@ limitations under the License.
</dependency>
<dependency>
- <groupId>com.alibaba.fluss</groupId>
+ <groupId>org.apache.fluss</groupId>
<artifactId>fluss-server</artifactId>
<version>${fluss.version}</version>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.alibaba.fluss</groupId>
+ <groupId>org.apache.fluss</groupId>
<artifactId>fluss-server</artifactId>
<version>${fluss.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.alibaba.fluss</groupId>
+ <groupId>org.apache.fluss</groupId>
<artifactId>fluss-test-utils</artifactId>
<version>${fluss.version}</version>
<scope>test</scope>
</dependency>
<!-- In Flink CDC project has Pipeline Sink Connector for Fluss. we
import fluss-fink for Fluss Sink Connector just for test purpose -->
<dependency>
- <groupId>com.alibaba.fluss</groupId>
+ <groupId>org.apache.fluss</groupId>
<artifactId>fluss-flink-common</artifactId>
<version>${fluss.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.alibaba.fluss</groupId>
+ <groupId>org.apache.fluss</groupId>
<artifactId>fluss-flink-1.20</artifactId>
<version>${fluss.version}</version>
<scope>test</scope>
@@ -129,7 +129,7 @@ limitations under the License.
<shadeTestJar>false</shadeTestJar>
<artifactSet>
<includes>
- <include>com.alibaba.fluss:*</include>
+ <include>org.apache.fluss:*</include>
</includes>
</artifactSet>
</configuration>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactory.java
index 754eb693a..58e469d2d 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/factory/FlussDataSinkFactory.java
@@ -23,8 +23,8 @@ import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.connectors.fluss.sink.FlussDataSink;
-import com.alibaba.fluss.config.ConfigOptions;
-import com.alibaba.fluss.config.Configuration;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
import java.util.HashMap;
import java.util.HashSet;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java
index e03caf8e6..4491e5032 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussDataSink.java
@@ -25,7 +25,7 @@ import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussSink;
-import com.alibaba.fluss.config.Configuration;
+import org.apache.fluss.config.Configuration;
import java.util.List;
import java.util.Map;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
index e92557b11..086da5894 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java
@@ -24,14 +24,15 @@ import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEvent;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEventSerializer;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussRowWithOp;
-import com.alibaba.fluss.client.Connection;
-import com.alibaba.fluss.client.table.Table;
-import com.alibaba.fluss.metadata.TablePath;
-import com.alibaba.fluss.types.DataType;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataType;
import java.io.IOException;
import java.util.Collections;
@@ -129,12 +130,12 @@ public class FlussEventSerializationSchema implements
FlussEventSerializer<Event
private static class TableSchemaInfo {
org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema;
- com.alibaba.fluss.metadata.Schema downStreamFlusstreamSchema;
+ org.apache.fluss.metadata.Schema downStreamFlusstreamSchema;
Map<Integer, Integer> indexMapping;
private TableSchemaInfo(
org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema,
- com.alibaba.fluss.metadata.Schema downStreamFlusstreamSchema) {
+ org.apache.fluss.metadata.Schema downStreamFlusstreamSchema) {
this.upstreamCdcSchema = upstreamCdcSchema;
this.downStreamFlusstreamSchema = downStreamFlusstreamSchema;
this.indexMapping =
@@ -144,8 +145,8 @@ public class FlussEventSerializationSchema implements
FlussEventSerializer<Event
}
static Map<Integer, Integer> sanityCheckAndGenerateIndexMapping(
- com.alibaba.fluss.metadata.Schema inferredFlussSchema,
- com.alibaba.fluss.metadata.Schema currentFlussNewSchema) {
+ org.apache.fluss.metadata.Schema inferredFlussSchema,
+ org.apache.fluss.metadata.Schema currentFlussNewSchema) {
List<String> inferredSchemaColumnNames =
inferredFlussSchema.getColumnNames();
Map<String, Integer> reverseIndex = new HashMap<>();
for (int i = 0; i < inferredSchemaColumnNames.size(); i++) {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
index 2e13b70d9..4e28623e2 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java
@@ -26,14 +26,14 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.table.api.ValidationException;
-import com.alibaba.fluss.client.Connection;
-import com.alibaba.fluss.client.ConnectionFactory;
-import com.alibaba.fluss.client.admin.Admin;
-import com.alibaba.fluss.config.Configuration;
-import com.alibaba.fluss.metadata.DatabaseDescriptor;
-import com.alibaba.fluss.metadata.TableDescriptor;
-import com.alibaba.fluss.metadata.TableInfo;
-import com.alibaba.fluss.metadata.TablePath;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussArray.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussArray.java
new file mode 100644
index 000000000..4fa32e6a8
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussArray.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.sink.row;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.TimestampData;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+
+import static
org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow.fromFlinkDecimal;
+
+/** Wraps a CDC {@link ArrayData} as a Fluss {@link InternalArray}. */
+public class CdcAsFlussArray implements InternalArray {
+
+ private final ArrayData flussArray;
+
+ public CdcAsFlussArray(ArrayData flussArray) {
+ this.flussArray = flussArray;
+ }
+
+ @Override
+ public int size() {
+ return flussArray.size();
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ return flussArray.toBooleanArray();
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ return flussArray.toByteArray();
+ }
+
+ @Override
+ public short[] toShortArray() {
+ return flussArray.toShortArray();
+ }
+
+ @Override
+ public int[] toIntArray() {
+ return flussArray.toIntArray();
+ }
+
+ @Override
+ public long[] toLongArray() {
+ return flussArray.toLongArray();
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ return flussArray.toFloatArray();
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ return flussArray.toDoubleArray();
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return flussArray.isNullAt(pos);
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return flussArray.getBoolean(pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return flussArray.getByte(pos);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return flussArray.getShort(pos);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return flussArray.getInt(pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return flussArray.getLong(pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return flussArray.getFloat(pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return flussArray.getDouble(pos);
+ }
+
+ @Override
+ public BinaryString getChar(int pos, int length) {
+ return BinaryString.fromBytes(flussArray.getString(pos).toBytes());
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ return BinaryString.fromBytes(flussArray.getString(pos).toBytes());
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ return fromFlinkDecimal(flussArray.getDecimal(pos, precision, scale));
+ }
+
+ @Override
+ public TimestampNtz getTimestampNtz(int pos, int precision) {
+ TimestampData timestamp = flussArray.getTimestamp(pos, precision);
+ return TimestampNtz.fromMillis(
+ timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
+ }
+
+ @Override
+ public TimestampLtz getTimestampLtz(int pos, int precision) {
+ TimestampData timestamp = flussArray.getTimestamp(pos, precision);
+ return TimestampLtz.fromEpochMillis(
+ timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
+ }
+
+ @Override
+ public byte[] getBinary(int pos, int length) {
+ return flussArray.getBinary(pos);
+ }
+
+ @Override
+ public byte[] getBytes(int pos) {
+ return flussArray.getBinary(pos);
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ return new CdcAsFlussArray(flussArray.getArray(pos));
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ return new CdcAsFlussMap(flussArray.getMap(pos));
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ return CdcAsFlussRow.replace(flussArray.getRecord(pos, numFields));
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussMap.java
similarity index 52%
copy from
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java
copy to
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussMap.java
index dadf0d2ee..b209001ee 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussMap.java
@@ -15,21 +15,34 @@
* limitations under the License.
*/
-package org.apache.flink.cdc.connectors.fluss.sink.v2;
+package org.apache.flink.cdc.connectors.fluss.sink.row;
-import com.alibaba.fluss.client.Connection;
+import org.apache.flink.cdc.common.data.MapData;
-import java.io.IOException;
-import java.io.Serializable;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
-/**
- * Serializer to serialize the input record to a {@link FlussEvent} for {@link
FlussSinkWriter}.
- *
- * @param <InputT> The type of the input record which comes from the upstream
and will be
- * transformed into a FlussEvent here.
- */
-public interface FlussEventSerializer<InputT> extends Serializable {
- void open(Connection connection) throws IOException;
+/** Wraps a Cdc {@link MapData} as a Fluss {@link InternalMap}. */
+public class CdcAsFlussMap implements InternalMap {
+
+ private final MapData cdcMap;
+
+ public CdcAsFlussMap(MapData cdcMap) {
+ this.cdcMap = cdcMap;
+ }
+
+ @Override
+ public int size() {
+ return cdcMap.size();
+ }
+
+ @Override
+ public InternalArray keyArray() {
+ return new CdcAsFlussArray(cdcMap.keyArray());
+ }
- FlussEvent serialize(InputT in) throws IOException;
+ @Override
+ public InternalArray valueArray() {
+ return new CdcAsFlussArray(cdcMap.valueArray());
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/CdcAsFlussRow.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussRow.java
similarity index 87%
rename from
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/CdcAsFlussRow.java
rename to
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussRow.java
index 5891d58a7..4a97ebc22 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/CdcAsFlussRow.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussRow.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.cdc.connectors.fluss.sink;
+package org.apache.flink.cdc.connectors.fluss.sink.row;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
@@ -24,11 +24,13 @@ import
org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.TimestampData;
-import com.alibaba.fluss.row.BinaryString;
-import com.alibaba.fluss.row.Decimal;
-import com.alibaba.fluss.row.InternalRow;
-import com.alibaba.fluss.row.TimestampLtz;
-import com.alibaba.fluss.row.TimestampNtz;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
import java.util.Map;
import java.util.stream.Collectors;
@@ -167,6 +169,22 @@ public class CdcAsFlussRow implements InternalRow {
return cdcRecord.getBinary(indexMapping.get(pos));
}
+ @Override
+ public InternalArray getArray(int i) {
+ return new CdcAsFlussArray(cdcRecord.getArray(indexMapping.get(i)));
+ }
+
+ @Override
+ public InternalMap getMap(int i) {
+ return new CdcAsFlussMap(cdcRecord.getMap(indexMapping.get(i)));
+ }
+
+ @Override
+ public InternalRow getRow(int i, int numFields) {
+ return new CdcAsFlussRow(
+ cdcRecord.getRow(indexMapping.get(i), numFields), numFields,
indexMapping);
+ }
+
@VisibleForTesting
public RecordData getCdcRecord() {
return cdcRecord;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEvent.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEvent.java
index 75df49fa6..a31d07bd4 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEvent.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEvent.java
@@ -17,7 +17,7 @@
package org.apache.flink.cdc.connectors.fluss.sink.v2;
-import com.alibaba.fluss.metadata.TablePath;
+import org.apache.fluss.metadata.TablePath;
import java.util.List;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java
index dadf0d2ee..c90cdc913 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussEventSerializer.java
@@ -17,7 +17,7 @@
package org.apache.flink.cdc.connectors.fluss.sink.v2;
-import com.alibaba.fluss.client.Connection;
+import org.apache.fluss.client.Connection;
import java.io.IOException;
import java.io.Serializable;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussRowWithOp.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussRowWithOp.java
index 0dedaf6cc..2042dbdc4 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussRowWithOp.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussRowWithOp.java
@@ -17,13 +17,13 @@
package org.apache.flink.cdc.connectors.fluss.sink.v2;
-import com.alibaba.fluss.row.InternalRow;
+import org.apache.fluss.row.InternalRow;
import javax.annotation.Nullable;
import java.util.Objects;
-import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
/* This file is based on source code of Apache Fluss Project
(https://fluss.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java
index 4e6d789e9..0bd74f060 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
-import com.alibaba.fluss.config.Configuration;
+import org.apache.fluss.config.Configuration;
import java.io.IOException;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkWriter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkWriter.java
index dab9e6ed5..8a3754e0e 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkWriter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkWriter.java
@@ -23,18 +23,18 @@ import
org.apache.flink.cdc.connectors.fluss.sink.v2.metrics.WrapperFlussMetricR
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import com.alibaba.fluss.client.Connection;
-import com.alibaba.fluss.client.ConnectionFactory;
-import com.alibaba.fluss.client.table.Table;
-import com.alibaba.fluss.client.table.writer.AppendWriter;
-import com.alibaba.fluss.client.table.writer.TableWriter;
-import com.alibaba.fluss.client.table.writer.UpsertWriter;
-import com.alibaba.fluss.config.Configuration;
-import com.alibaba.fluss.metadata.TablePath;
-import com.alibaba.fluss.metrics.Gauge;
-import com.alibaba.fluss.metrics.Metric;
-import com.alibaba.fluss.metrics.MetricNames;
-import com.alibaba.fluss.row.InternalRow;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.client.table.writer.TableWriter;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.Gauge;
+import org.apache.fluss.metrics.Metric;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.row.InternalRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussCounter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussCounter.java
index 61bcff7b0..48d094096 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussCounter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussCounter.java
@@ -26,9 +26,9 @@ import org.apache.flink.metrics.Counter;
/** An implementation of Flink's {@link Counter} which wraps Fluss's Counter.
*/
public class WrappedFlussCounter implements Counter {
- private final com.alibaba.fluss.metrics.Counter flussCounter;
+ private final org.apache.fluss.metrics.Counter flussCounter;
- public WrappedFlussCounter(com.alibaba.fluss.metrics.Counter flussCounter)
{
+ public WrappedFlussCounter(org.apache.fluss.metrics.Counter flussCounter) {
this.flussCounter = flussCounter;
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussGauge.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussGauge.java
index 1b5df21d0..45d7adaaf 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussGauge.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussGauge.java
@@ -26,9 +26,9 @@ import org.apache.flink.metrics.Gauge;
/** An implementation of Flink's {@link Gauge} which wraps Fluss's Gauge. */
public class WrappedFlussGauge<T> implements Gauge<T> {
- private final com.alibaba.fluss.metrics.Gauge<T> flussGauge;
+ private final org.apache.fluss.metrics.Gauge<T> flussGauge;
- public WrappedFlussGauge(com.alibaba.fluss.metrics.Gauge<T> flussGauge) {
+ public WrappedFlussGauge(org.apache.fluss.metrics.Gauge<T> flussGauge) {
this.flussGauge = flussGauge;
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussHistogram.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussHistogram.java
index a62a646a3..402c5cc1a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussHistogram.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussHistogram.java
@@ -27,9 +27,9 @@ import org.apache.flink.metrics.HistogramStatistics;
/** An implementation of Flink's {@link Histogram} which wraps Fluss's
Histogram. */
public class WrapperFlussHistogram implements Histogram {
- private final com.alibaba.fluss.metrics.Histogram flussHistogram;
+ private final org.apache.fluss.metrics.Histogram flussHistogram;
- public WrapperFlussHistogram(com.alibaba.fluss.metrics.Histogram
flussHistogram) {
+ public WrapperFlussHistogram(org.apache.fluss.metrics.Histogram
flussHistogram) {
this.flussHistogram = flussHistogram;
}
@@ -50,10 +50,10 @@ public class WrapperFlussHistogram implements Histogram {
private static class FlinkHistogramStatistics extends HistogramStatistics {
- private final com.alibaba.fluss.metrics.HistogramStatistics
flussHistogramStatistics;
+ private final org.apache.fluss.metrics.HistogramStatistics
flussHistogramStatistics;
public FlinkHistogramStatistics(
- com.alibaba.fluss.metrics.HistogramStatistics
flussHistogramStatistics) {
+ org.apache.fluss.metrics.HistogramStatistics
flussHistogramStatistics) {
this.flussHistogramStatistics = flussHistogramStatistics;
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMeter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMeter.java
index 40bac46a8..0b415b202 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMeter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMeter.java
@@ -26,9 +26,9 @@ import org.apache.flink.metrics.Meter;
/** An implementation of Flink's {@link Meter} which wraps Fluss's Meter. */
public class WrapperFlussMeter implements Meter {
- private final com.alibaba.fluss.metrics.Meter flussMeter;
+ private final org.apache.fluss.metrics.Meter flussMeter;
- public WrapperFlussMeter(com.alibaba.fluss.metrics.Meter flussMeter) {
+ public WrapperFlussMeter(org.apache.fluss.metrics.Meter flussMeter) {
this.flussMeter = flussMeter;
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMetricRegistry.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMetricRegistry.java
index 4251a71d6..39932bc2e 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMetricRegistry.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMetricRegistry.java
@@ -19,14 +19,14 @@ package
org.apache.flink.cdc.connectors.fluss.sink.v2.metrics;
import org.apache.flink.metrics.MetricGroup;
-import com.alibaba.fluss.metrics.CharacterFilter;
-import com.alibaba.fluss.metrics.Counter;
-import com.alibaba.fluss.metrics.Gauge;
-import com.alibaba.fluss.metrics.Histogram;
-import com.alibaba.fluss.metrics.Meter;
-import com.alibaba.fluss.metrics.Metric;
-import com.alibaba.fluss.metrics.groups.AbstractMetricGroup;
-import com.alibaba.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.metrics.CharacterFilter;
+import org.apache.fluss.metrics.Counter;
+import org.apache.fluss.metrics.Gauge;
+import org.apache.fluss.metrics.Histogram;
+import org.apache.fluss.metrics.Meter;
+import org.apache.fluss.metrics.Metric;
+import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.registry.MetricRegistry;
import java.util.Collections;
import java.util.HashMap;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
index eb311dd0e..25a9f53b6 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java
@@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.types.BigIntType;
import org.apache.flink.cdc.common.types.BinaryType;
import org.apache.flink.cdc.common.types.BooleanType;
import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DateType;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.DoubleType;
@@ -37,11 +38,12 @@ import org.apache.flink.cdc.common.types.TinyIntType;
import org.apache.flink.cdc.common.types.VarBinaryType;
import org.apache.flink.cdc.common.types.VarCharType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.util.CollectionUtil;
-import com.alibaba.fluss.annotation.VisibleForTesting;
-import com.alibaba.fluss.metadata.Schema;
-import com.alibaba.fluss.metadata.TableDescriptor;
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
import javax.annotation.Nullable;
@@ -85,7 +87,7 @@ public class FlussConversions {
.build();
}
- public static com.alibaba.fluss.metadata.Schema toFlussSchema(
+ public static org.apache.fluss.metadata.Schema toFlussSchema(
org.apache.flink.cdc.common.schema.Schema cdcSchema) {
Schema.Builder schemBuilder = Schema.newBuilder();
if (!CollectionUtil.isNullOrEmpty(cdcSchema.primaryKeys())) {
@@ -108,7 +110,7 @@ public class FlussConversions {
}
@VisibleForTesting
- private static com.alibaba.fluss.types.DataType toFlussType(
+ private static org.apache.fluss.types.DataType toFlussType(
org.apache.flink.cdc.common.types.DataType flinkDataType) {
return flinkDataType.accept(TO_FLUSS_TYPE_INSTANCE);
}
@@ -137,119 +139,128 @@ public class FlussConversions {
private static class CdcTypeToFlussType
implements org.apache.flink.cdc.common.types.DataTypeVisitor<
- com.alibaba.fluss.types.DataType> {
+ org.apache.fluss.types.DataType> {
@Override
- public com.alibaba.fluss.types.DataType visit(CharType charType) {
- return new com.alibaba.fluss.types.CharType(
- charType.isNullable(), charType.getLength());
+ public org.apache.fluss.types.DataType visit(CharType charType) {
+ return new org.apache.fluss.types.CharType(charType.isNullable(),
charType.getLength());
}
@Override
- public com.alibaba.fluss.types.DataType visit(VarCharType varCharType)
{
+ public org.apache.fluss.types.DataType visit(VarCharType varCharType) {
// fluss not support varchar type
- return new
com.alibaba.fluss.types.StringType(varCharType.isNullable());
+ return new
org.apache.fluss.types.StringType(varCharType.isNullable());
}
@Override
- public com.alibaba.fluss.types.DataType visit(BooleanType booleanType)
{
- return new
com.alibaba.fluss.types.BooleanType(booleanType.isNullable());
+ public org.apache.fluss.types.DataType visit(BooleanType booleanType) {
+ return new
org.apache.fluss.types.BooleanType(booleanType.isNullable());
}
@Override
- public com.alibaba.fluss.types.DataType visit(BinaryType binaryType) {
- return new com.alibaba.fluss.types.BinaryType(
+ public org.apache.fluss.types.DataType visit(BinaryType binaryType) {
+ return new org.apache.fluss.types.BinaryType(
binaryType.isNullable(), binaryType.getLength());
}
@Override
- public com.alibaba.fluss.types.DataType visit(VarBinaryType
varBinaryType) {
+ public org.apache.fluss.types.DataType visit(VarBinaryType
varBinaryType) {
// fluss not support varbinary type
- return new
com.alibaba.fluss.types.BytesType(varBinaryType.isNullable());
+ return new
org.apache.fluss.types.BytesType(varBinaryType.isNullable());
}
@Override
- public com.alibaba.fluss.types.DataType visit(DecimalType decimalType)
{
- return new com.alibaba.fluss.types.DecimalType(
+ public org.apache.fluss.types.DataType visit(DecimalType decimalType) {
+ return new org.apache.fluss.types.DecimalType(
decimalType.isNullable(), decimalType.getPrecision(),
decimalType.getScale());
}
@Override
- public com.alibaba.fluss.types.DataType visit(TinyIntType tinyIntType)
{
- return new
com.alibaba.fluss.types.TinyIntType(tinyIntType.isNullable());
+ public org.apache.fluss.types.DataType visit(TinyIntType tinyIntType) {
+ return new
org.apache.fluss.types.TinyIntType(tinyIntType.isNullable());
}
@Override
- public com.alibaba.fluss.types.DataType visit(SmallIntType
smallIntType) {
- return new
com.alibaba.fluss.types.SmallIntType(smallIntType.isNullable());
+ public org.apache.fluss.types.DataType visit(SmallIntType
smallIntType) {
+ return new
org.apache.fluss.types.SmallIntType(smallIntType.isNullable());
}
@Override
- public com.alibaba.fluss.types.DataType visit(IntType intType) {
- return new com.alibaba.fluss.types.IntType(intType.isNullable());
+ public org.apache.fluss.types.DataType visit(IntType intType) {
+ return new org.apache.fluss.types.IntType(intType.isNullable());
}
@Override
- public com.alibaba.fluss.types.DataType visit(BigIntType bigIntType) {
- return new
com.alibaba.fluss.types.BigIntType(bigIntType.isNullable());
+ public org.apache.fluss.types.DataType visit(BigIntType bigIntType) {
+ return new
org.apache.fluss.types.BigIntType(bigIntType.isNullable());
}
@Override
- public com.alibaba.fluss.types.DataType visit(FloatType floatType) {
- return new
com.alibaba.fluss.types.FloatType(floatType.isNullable());
+ public org.apache.fluss.types.DataType visit(FloatType floatType) {
+ return new
org.apache.fluss.types.FloatType(floatType.isNullable());
}
@Override
- public com.alibaba.fluss.types.DataType visit(DoubleType doubleType) {
- return new
com.alibaba.fluss.types.DoubleType(doubleType.isNullable());
+ public org.apache.fluss.types.DataType visit(DoubleType doubleType) {
+ return new
org.apache.fluss.types.DoubleType(doubleType.isNullable());
}
@Override
- public com.alibaba.fluss.types.DataType visit(DateType dateType) {
- return new com.alibaba.fluss.types.DateType(dateType.isNullable());
+ public org.apache.fluss.types.DataType visit(DateType dateType) {
+ return new org.apache.fluss.types.DateType(dateType.isNullable());
}
@Override
- public com.alibaba.fluss.types.DataType visit(TimeType timeType) {
- return new com.alibaba.fluss.types.TimeType(
+ public org.apache.fluss.types.DataType visit(TimeType timeType) {
+ return new org.apache.fluss.types.TimeType(
timeType.isNullable(), timeType.getPrecision());
}
@Override
- public com.alibaba.fluss.types.DataType visit(TimestampType
timestampType) {
- return new com.alibaba.fluss.types.TimestampType(
+ public org.apache.fluss.types.DataType visit(TimestampType
timestampType) {
+ return new org.apache.fluss.types.TimestampType(
timestampType.isNullable(), timestampType.getPrecision());
}
@Override
- public com.alibaba.fluss.types.DataType visit(ZonedTimestampType
zonedTimestampType) {
+ public org.apache.fluss.types.DataType visit(ZonedTimestampType
zonedTimestampType) {
throw new UnsupportedOperationException(
"Unsupported data type in fluss " + zonedTimestampType);
}
@Override
- public com.alibaba.fluss.types.DataType visit(
+ public org.apache.fluss.types.DataType visit(
LocalZonedTimestampType localZonedTimestampType) {
- return new com.alibaba.fluss.types.LocalZonedTimestampType(
+ return new org.apache.fluss.types.LocalZonedTimestampType(
localZonedTimestampType.isNullable(),
localZonedTimestampType.getPrecision());
}
@Override
- public com.alibaba.fluss.types.DataType visit(ArrayType arrayType) {
- throw new UnsupportedOperationException(
- "Unsupported data type in fluss version under 0.7: " +
arrayType);
+ public org.apache.fluss.types.DataType visit(ArrayType arrayType) {
+ List<DataType> children = arrayType.getChildren();
+ Preconditions.checkState(!children.isEmpty());
+ org.apache.fluss.types.DataType flussChildType =
toFlussType(children.get(0));
+ return new
org.apache.fluss.types.ArrayType(arrayType.isNullable(), flussChildType);
}
@Override
- public com.alibaba.fluss.types.DataType visit(MapType mapType) {
- throw new UnsupportedOperationException(
- "Unsupported data type in fluss version under 0.7: " +
mapType);
+ public org.apache.fluss.types.DataType visit(MapType mapType) {
+ org.apache.fluss.types.DataType flussKeyType =
toFlussType(mapType.getKeyType());
+ org.apache.fluss.types.DataType flussValueType =
toFlussType(mapType.getValueType());
+ return new org.apache.fluss.types.MapType(
+ mapType.isNullable(), flussKeyType, flussValueType);
}
@Override
- public com.alibaba.fluss.types.DataType visit(RowType rowType) {
- throw new UnsupportedOperationException(
- "Unsupported data type in fluss version under 0.7: " +
rowType);
+ public org.apache.fluss.types.DataType visit(RowType rowType) {
+ return new org.apache.fluss.types.RowType(
+ rowType.isNullable(),
+ rowType.getFields().stream()
+ .map(
+ field ->
+ new
org.apache.fluss.types.DataField(
+ field.getName(),
field.getType().accept(this)))
+ .collect(Collectors.toList()));
}
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
index ec2d2b4a2..bfc71a1ad 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
@@ -45,12 +45,12 @@ import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
-import com.alibaba.fluss.client.Connection;
-import com.alibaba.fluss.client.ConnectionFactory;
-import com.alibaba.fluss.config.ConfigOptions;
-import com.alibaba.fluss.config.MemorySize;
-import com.alibaba.fluss.metadata.DataLakeFormat;
-import com.alibaba.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.MemorySize;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -64,12 +64,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static com.alibaba.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
-import static
com.alibaba.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
-import static
com.alibaba.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
import static
org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1;
import static
org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2;
import static
org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
+import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for Fluss Pipeline. */
@@ -320,7 +320,7 @@ public class FlussPipelineITCase {
ValuesDataSourceHelper.singleSplitSingleTable(),
sinkOption))
.rootCause()
- .hasMessageContaining("'table.non-key' is not a Fluss table
property");
+ .hasMessageContaining("'table.non-key' is not a recognized
Fluss table property");
}
@Test
@@ -566,8 +566,8 @@ public class FlussPipelineITCase {
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}
- private static com.alibaba.fluss.config.Configuration initConfig() {
- com.alibaba.fluss.config.Configuration conf = new
com.alibaba.fluss.config.Configuration();
+ private static org.apache.fluss.config.Configuration initConfig() {
+ org.apache.fluss.config.Configuration conf = new
org.apache.fluss.config.Configuration();
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
// set a shorter interval for testing purpose
conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
@@ -584,7 +584,7 @@ public class FlussPipelineITCase {
conf.setString("security.sasl.enabled.mechanisms", "plain");
conf.setString(
"security.sasl.plain.jaas.config",
- "com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule
required "
+ "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule
required "
+ " user_root=\"password\" "
+ " user_guest=\"password2\";");
return conf;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
index 7097a72ee..1af59aa97 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchemaTest.java
@@ -36,15 +36,16 @@ import org.apache.flink.cdc.common.types.IntType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEvent;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussRowWithOp;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
-import com.alibaba.fluss.client.Connection;
-import com.alibaba.fluss.client.ConnectionFactory;
-import com.alibaba.fluss.metadata.TablePath;
-import com.alibaba.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
index 9c35f002d..c95aa7868 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java
@@ -24,15 +24,15 @@ import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.IntType;
-import com.alibaba.fluss.client.Connection;
-import com.alibaba.fluss.client.ConnectionFactory;
-import com.alibaba.fluss.client.admin.Admin;
-import com.alibaba.fluss.exception.InvalidConfigException;
-import com.alibaba.fluss.metadata.TableDescriptor;
-import com.alibaba.fluss.metadata.TableInfo;
-import com.alibaba.fluss.metadata.TablePath;
-import com.alibaba.fluss.server.testutils.FlussClusterExtension;
-import com.alibaba.fluss.types.RowType;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.exception.InvalidConfigException;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.types.RowType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -47,7 +47,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-import static com.alibaba.fluss.config.ConfigOptions.TABLE_REPLICATION_FACTOR;
+import static org.apache.fluss.config.ConfigOptions.TABLE_REPLICATION_FACTOR;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -129,28 +129,28 @@ public class FlussMetadataApplierTest {
DataTypes.TIMESTAMP_LTZ(6)
};
- com.alibaba.fluss.types.DataType[] flussDataTypes =
- new com.alibaba.fluss.types.DataType[] {
- com.alibaba.fluss.types.DataTypes.BINARY(10),
+ org.apache.fluss.types.DataType[] flussDataTypes =
+ new org.apache.fluss.types.DataType[] {
+ org.apache.fluss.types.DataTypes.BINARY(10),
// fluss not support binary, will be mapped to bytes
- com.alibaba.fluss.types.DataTypes.BYTES(),
- com.alibaba.fluss.types.DataTypes.BYTES(),
- com.alibaba.fluss.types.DataTypes.BOOLEAN(),
- com.alibaba.fluss.types.DataTypes.TINYINT(),
- com.alibaba.fluss.types.DataTypes.SMALLINT(),
- new com.alibaba.fluss.types.IntType(false),
- com.alibaba.fluss.types.DataTypes.BIGINT(),
- com.alibaba.fluss.types.DataTypes.FLOAT(),
- com.alibaba.fluss.types.DataTypes.DOUBLE(),
- com.alibaba.fluss.types.DataTypes.DECIMAL(38, 18),
- com.alibaba.fluss.types.DataTypes.CHAR(10),
+ org.apache.fluss.types.DataTypes.BYTES(),
+ org.apache.fluss.types.DataTypes.BYTES(),
+ org.apache.fluss.types.DataTypes.BOOLEAN(),
+ org.apache.fluss.types.DataTypes.TINYINT(),
+ org.apache.fluss.types.DataTypes.SMALLINT(),
+ new org.apache.fluss.types.IntType(false),
+ org.apache.fluss.types.DataTypes.BIGINT(),
+ org.apache.fluss.types.DataTypes.FLOAT(),
+ org.apache.fluss.types.DataTypes.DOUBLE(),
+ org.apache.fluss.types.DataTypes.DECIMAL(38, 18),
+ org.apache.fluss.types.DataTypes.CHAR(10),
// fluss not support varchar, will be mapped to string
- com.alibaba.fluss.types.DataTypes.STRING(),
- com.alibaba.fluss.types.DataTypes.STRING(),
- com.alibaba.fluss.types.DataTypes.DATE(),
- com.alibaba.fluss.types.DataTypes.TIME(),
- com.alibaba.fluss.types.DataTypes.TIMESTAMP(3),
- com.alibaba.fluss.types.DataTypes.TIMESTAMP_LTZ(6)
+ org.apache.fluss.types.DataTypes.STRING(),
+ org.apache.fluss.types.DataTypes.STRING(),
+ org.apache.fluss.types.DataTypes.DATE(),
+ org.apache.fluss.types.DataTypes.TIME(),
+ org.apache.fluss.types.DataTypes.TIMESTAMP(3),
+ org.apache.fluss.types.DataTypes.TIMESTAMP_LTZ(6)
};
try (FlussMetaDataApplier applier =
@@ -188,17 +188,10 @@ public class FlussMetadataApplierTest {
@Test
void testUnsupportedType() throws Exception {
- String[] fieldNames = new String[] {"timestamp_tz_col", "array_col",
"map_col", "row_col"};
+ String[] fieldNames = new String[] {"timestamp_tz_col"};
org.apache.flink.cdc.common.types.DataType[] cdcDataTypes =
- new org.apache.flink.cdc.common.types.DataType[] {
- DataTypes.ARRAY(DataTypes.STRING()),
- DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
- DataTypes.ROW(
- DataTypes.FIELD("name", DataTypes.STRING()),
- DataTypes.FIELD("age", DataTypes.INT())),
- DataTypes.TIMESTAMP_TZ()
- };
+ new org.apache.flink.cdc.common.types.DataType[]
{DataTypes.TIMESTAMP_TZ()};
try (FlussMetaDataApplier applier =
new FlussMetaDataApplier(
@@ -227,10 +220,10 @@ public class FlussMetadataApplierTest {
tablePath,
TableDescriptor.builder()
.schema(
-
com.alibaba.fluss.metadata.Schema.newBuilder()
+
org.apache.fluss.metadata.Schema.newBuilder()
.column(
"id",
-
com.alibaba.fluss.types.DataTypes.INT())
+
org.apache.fluss.types.DataTypes.INT())
.build())
.build(),
true)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
index e65a7d199..1cd80c6a6 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
@@ -40,9 +40,9 @@ import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
-import com.alibaba.fluss.client.Connection;
-import com.alibaba.fluss.client.ConnectionFactory;
-import com.alibaba.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -58,9 +58,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import static com.alibaba.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
-import static
com.alibaba.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
-import static
com.alibaba.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Integration tests for FlussSink. */
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
index 2f3487528..eddaeb6ff 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.cdc.connectors.fluss.utils;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
-import com.alibaba.fluss.metadata.TableDescriptor;
-import com.alibaba.fluss.types.RowType;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.types.RowType;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -48,7 +48,7 @@ class FlussConversionsTest {
.primaryKey("id")
.build();
- com.alibaba.fluss.metadata.Schema flussSchema =
FlussConversions.toFlussSchema(cdcSchema);
+ org.apache.fluss.metadata.Schema flussSchema =
FlussConversions.toFlussSchema(cdcSchema);
assertThat(flussSchema.getColumnNames()).containsExactly("id", "name");
assertThat(flussSchema.getPrimaryKeyColumnNames()).containsExactly("id");
@@ -62,7 +62,7 @@ class FlussConversionsTest {
.physicalColumn("name", DataTypes.STRING())
.build();
- com.alibaba.fluss.metadata.Schema flussSchema =
FlussConversions.toFlussSchema(cdcSchema);
+ org.apache.fluss.metadata.Schema flussSchema =
FlussConversions.toFlussSchema(cdcSchema);
assertThat(flussSchema.getColumnNames()).hasSize(2);
assertThat(flussSchema.getPrimaryKeyColumnNames()).isEmpty();
@@ -88,33 +88,43 @@ class FlussConversionsTest {
.physicalColumn("time_col", DataTypes.TIME())
.physicalColumn("timestamp_col",
DataTypes.TIMESTAMP(3))
.physicalColumn("ltz_col", DataTypes.TIMESTAMP_LTZ(6))
+ .physicalColumn("arr",
DataTypes.ARRAY(DataTypes.INT()))
+ .physicalColumn("map",
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
.build();
- com.alibaba.fluss.metadata.Schema flussSchema =
FlussConversions.toFlussSchema(cdcSchema);
+ org.apache.fluss.metadata.Schema flussSchema =
FlussConversions.toFlussSchema(cdcSchema);
RowType rowType = flussSchema.getRowType();
- assertThat(rowType.getFieldCount()).isEqualTo(16);
-
-
assertThat(rowType.getTypeAt(0)).isEqualTo(com.alibaba.fluss.types.DataTypes.BOOLEAN());
-
assertThat(rowType.getTypeAt(1)).isEqualTo(com.alibaba.fluss.types.DataTypes.TINYINT());
-
assertThat(rowType.getTypeAt(2)).isEqualTo(com.alibaba.fluss.types.DataTypes.SMALLINT());
-
assertThat(rowType.getTypeAt(3)).isEqualTo(com.alibaba.fluss.types.DataTypes.INT());
-
assertThat(rowType.getTypeAt(4)).isEqualTo(com.alibaba.fluss.types.DataTypes.BIGINT());
-
assertThat(rowType.getTypeAt(5)).isEqualTo(com.alibaba.fluss.types.DataTypes.FLOAT());
-
assertThat(rowType.getTypeAt(6)).isEqualTo(com.alibaba.fluss.types.DataTypes.DOUBLE());
- assertThat(rowType.getTypeAt(7))
- .isEqualTo(com.alibaba.fluss.types.DataTypes.DECIMAL(10, 2));
-
assertThat(rowType.getTypeAt(8)).isEqualTo(com.alibaba.fluss.types.DataTypes.CHAR(10));
+ assertThat(rowType.getFieldCount()).isEqualTo(18);
+
+
assertThat(rowType.getTypeAt(0)).isEqualTo(org.apache.fluss.types.DataTypes.BOOLEAN());
+
assertThat(rowType.getTypeAt(1)).isEqualTo(org.apache.fluss.types.DataTypes.TINYINT());
+
assertThat(rowType.getTypeAt(2)).isEqualTo(org.apache.fluss.types.DataTypes.SMALLINT());
+
assertThat(rowType.getTypeAt(3)).isEqualTo(org.apache.fluss.types.DataTypes.INT());
+
assertThat(rowType.getTypeAt(4)).isEqualTo(org.apache.fluss.types.DataTypes.BIGINT());
+
assertThat(rowType.getTypeAt(5)).isEqualTo(org.apache.fluss.types.DataTypes.FLOAT());
+
assertThat(rowType.getTypeAt(6)).isEqualTo(org.apache.fluss.types.DataTypes.DOUBLE());
+
assertThat(rowType.getTypeAt(7)).isEqualTo(org.apache.fluss.types.DataTypes.DECIMAL(10,
2));
+
assertThat(rowType.getTypeAt(8)).isEqualTo(org.apache.fluss.types.DataTypes.CHAR(10));
// VarChar maps to StringType in Fluss
-
assertThat(rowType.getTypeAt(9)).isEqualTo(com.alibaba.fluss.types.DataTypes.STRING());
-
assertThat(rowType.getTypeAt(10)).isEqualTo(com.alibaba.fluss.types.DataTypes.BINARY(16));
+
assertThat(rowType.getTypeAt(9)).isEqualTo(org.apache.fluss.types.DataTypes.STRING());
+
assertThat(rowType.getTypeAt(10)).isEqualTo(org.apache.fluss.types.DataTypes.BINARY(16));
// VarBinary maps to BytesType in Fluss
-
assertThat(rowType.getTypeAt(11)).isEqualTo(com.alibaba.fluss.types.DataTypes.BYTES());
-
assertThat(rowType.getTypeAt(12)).isEqualTo(com.alibaba.fluss.types.DataTypes.DATE());
-
assertThat(rowType.getTypeAt(13)).isEqualTo(com.alibaba.fluss.types.DataTypes.TIME());
-
assertThat(rowType.getTypeAt(14)).isEqualTo(com.alibaba.fluss.types.DataTypes.TIMESTAMP(3));
+
assertThat(rowType.getTypeAt(11)).isEqualTo(org.apache.fluss.types.DataTypes.BYTES());
+
assertThat(rowType.getTypeAt(12)).isEqualTo(org.apache.fluss.types.DataTypes.DATE());
+
assertThat(rowType.getTypeAt(13)).isEqualTo(org.apache.fluss.types.DataTypes.TIME());
+
assertThat(rowType.getTypeAt(14)).isEqualTo(org.apache.fluss.types.DataTypes.TIMESTAMP(3));
assertThat(rowType.getTypeAt(15))
- .isEqualTo(com.alibaba.fluss.types.DataTypes.TIMESTAMP_LTZ(6));
+ .isEqualTo(org.apache.fluss.types.DataTypes.TIMESTAMP_LTZ(6));
+ assertThat(rowType.getTypeAt(16))
+ .isEqualTo(
+ org.apache.fluss.types.DataTypes.ARRAY(
+ org.apache.fluss.types.DataTypes.INT()));
+ assertThat(rowType.getTypeAt(17))
+ .isEqualTo(
+ org.apache.fluss.types.DataTypes.MAP(
+ org.apache.fluss.types.DataTypes.STRING(),
+ org.apache.fluss.types.DataTypes.INT()));
}
@Test
@@ -126,7 +136,7 @@ class FlussConversionsTest {
.physicalColumn("not_null_col",
DataTypes.INT().notNull())
.build();
- com.alibaba.fluss.metadata.Schema flussSchema =
FlussConversions.toFlussSchema(cdcSchema);
+ org.apache.fluss.metadata.Schema flussSchema =
FlussConversions.toFlussSchema(cdcSchema);
RowType rowType = flussSchema.getRowType();
assertThat(rowType.getTypeAt(0).isNullable()).isTrue();
@@ -146,28 +156,6 @@ class FlussConversionsTest {
.hasMessageContaining("Unsupported data type in fluss");
}
- @Test
- void testToFlussSchemaUnsupportedArrayType() {
- Schema cdcSchema =
- Schema.newBuilder().physicalColumn("arr",
DataTypes.ARRAY(DataTypes.INT())).build();
-
- assertThatThrownBy(() -> FlussConversions.toFlussSchema(cdcSchema))
- .isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining("Unsupported data type in fluss");
- }
-
- @Test
- void testToFlussSchemaUnsupportedMapType() {
- Schema cdcSchema =
- Schema.newBuilder()
- .physicalColumn("map",
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
- .build();
-
- assertThatThrownBy(() -> FlussConversions.toFlussSchema(cdcSchema))
- .isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining("Unsupported data type in fluss");
- }
-
//
--------------------------------------------------------------------------------------------
// Tests for toFlussTable
//
--------------------------------------------------------------------------------------------
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index a083ce561..8c263f8de 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -42,7 +42,7 @@ limitations under the License.
<maven.plugin.download.version>1.6.8</maven.plugin.download.version>
<iceberg.version>1.10.1</iceberg.version>
<hive.version>2.3.9</hive.version>
- <fluss.version>0.7.0</fluss.version>
+ <fluss.version>0.9.0-incubating</fluss.version>
<jmh.version>1.37</jmh.version>
<hudi.version>1.1.0</hudi.version>
</properties>
@@ -673,10 +673,19 @@ limitations under the License.
</outputDirectory>
</artifactItem>
<artifactItem>
- <groupId>com.alibaba.fluss</groupId>
-
<artifactId>fluss-flink-${flink.major.version}</artifactId>
+ <groupId>org.apache.fluss</groupId>
+
<artifactId>fluss-flink-${flink-major-1.20}</artifactId>
<version>${fluss.version}</version>
-
<destFileName>fluss-sql-connector.jar</destFileName>
+
<destFileName>fluss-flink-${flink-1.20}.jar</destFileName>
+ <type>jar</type>
+
<outputDirectory>${project.build.directory}/dependencies
+ </outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.fluss</groupId>
+
<artifactId>fluss-flink-${flink-major-1.19}</artifactId>
+ <version>${fluss.version}</version>
+
<destFileName>fluss-flink-${flink-1.19}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java
index c20808fa3..8f12e20c1 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java
@@ -53,7 +53,7 @@ public class FlussE2eITCase extends PipelineTestEnvironment {
private static final Logger LOG =
LoggerFactory.getLogger(FlussE2eITCase.class);
private static final Duration FLUSS_TESTCASE_TIMEOUT =
Duration.ofMinutes(3);
- private static final String flussImageTag = "fluss/fluss:0.7.0";
+ private static final String flussImageTag =
"apache/fluss:0.9.0-incubating";
private static final String zooKeeperImageTag = "zookeeper:3.9.2";
private static final List<String> flussCoordinatorProperties =
@@ -64,7 +64,7 @@ public class FlussE2eITCase extends PipelineTestEnvironment {
"remote.data.dir: /tmp/fluss/remote-data",
"security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT",
"security.sasl.enabled.mechanisms: PLAIN",
- "security.sasl.plain.jaas.config:
com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required
user_admin=\"admin-pass\" user_developer=\"developer-pass\";",
+ "security.sasl.plain.jaas.config:
org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required
user_admin=\"admin-pass\" user_developer=\"developer-pass\";",
"super.users: User:admin");
private static final List<String> flussTabletServerProperties =
@@ -78,7 +78,7 @@ public class FlussE2eITCase extends PipelineTestEnvironment {
"remote.data.dir: /tmp/fluss/remote-data",
"security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT",
"security.sasl.enabled.mechanisms: PLAIN",
- "security.sasl.plain.jaas.config:
com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required
user_admin=\"admin-pass\" user_developer=\"developer-pass\";",
+ "security.sasl.plain.jaas.config:
org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required
user_admin=\"admin-pass\" user_developer=\"developer-pass\";",
"super.users: User:admin");
@Container
@@ -129,7 +129,7 @@ public class FlussE2eITCase extends PipelineTestEnvironment
{
// Due to a bug described in
https://github.com/apache/fluss/pull/1267, it's not viable to
// pass Fluss dependency with `--jar` CLI option. We may remove this
workaround and use
// `submitPipelineJob` to carry extra jar later.
- return Collections.singletonList("fluss-sql-connector.jar");
+ return Collections.singletonList(String.format("fluss-flink-%s.jar",
flinkVersion));
}
@BeforeEach