This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new dba644d27 [flink] Basic $binlog read support without pushdown
optimizations for pk table (#2525)
dba644d27 is described below
commit dba644d27aa6e9788327587da702e80a3c033ebc
Author: MehulBatra <[email protected]>
AuthorDate: Mon Feb 2 14:53:54 2026 +0530
[flink] Basic $binlog read support without pushdown optimizations for pk
table (#2525)
---
.../org/apache/fluss/metadata/TableDescriptor.java | 4 +
.../source/Flink118BinlogVirtualTableITCase.java | 21 ++
.../source/Flink119BinlogVirtualTableITCase.java | 21 ++
.../source/Flink120BinlogVirtualTableITCase.java | 21 ++
.../source/Flink22BinlogVirtualTableITCase.java | 21 ++
.../apache/fluss/flink/FlinkConnectorOptions.java | 8 +-
.../apache/fluss/flink/catalog/FlinkCatalog.java | 114 ++++++-
.../fluss/flink/catalog/FlinkTableFactory.java | 53 +++
...ableSource.java => BinlogFlinkTableSource.java} | 123 ++-----
.../flink/source/ChangelogFlinkTableSource.java | 18 +-
.../deserializer/BinlogDeserializationSchema.java | 85 +++++
.../flink/source/emitter/FlinkRecordEmitter.java | 26 +-
.../fluss/flink/utils/BinlogRowConverter.java | 174 ++++++++++
.../fluss/flink/utils/ChangelogRowConverter.java | 17 +-
.../fluss/flink/catalog/FlinkCatalogITCase.java | 52 ++-
.../flink/source/BinlogVirtualTableITCase.java | 362 +++++++++++++++++++++
.../flink/source/ChangelogVirtualTableITCase.java | 49 +--
.../fluss/flink/utils/BinlogRowConverterTest.java | 249 ++++++++++++++
.../flink/utils/ChangelogRowConverterTest.java | 18 +-
19 files changed, 1285 insertions(+), 151 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java
index 4c8ecbd79..9f18dd401 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java
@@ -65,6 +65,10 @@ public final class TableDescriptor implements Serializable {
public static final String LOG_OFFSET_COLUMN = "_log_offset";
public static final String COMMIT_TIMESTAMP_COLUMN = "_commit_timestamp";
+ // column names for $binlog virtual table nested row fields
+ public static final String BEFORE_COLUMN = "before";
+ public static final String AFTER_COLUMN = "after";
+
private final Schema schema;
private final @Nullable String comment;
private final List<String> partitionKeys;
diff --git
a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118BinlogVirtualTableITCase.java
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118BinlogVirtualTableITCase.java
new file mode 100644
index 000000000..7dcd9e45a
--- /dev/null
+++
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118BinlogVirtualTableITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+/** IT case for {@link BinlogVirtualTableITCase} in Flink 1.18. */
+public class Flink118BinlogVirtualTableITCase extends BinlogVirtualTableITCase
{}
diff --git
a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119BinlogVirtualTableITCase.java
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119BinlogVirtualTableITCase.java
new file mode 100644
index 000000000..074a300f4
--- /dev/null
+++
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/source/Flink119BinlogVirtualTableITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+/** IT case for {@link BinlogVirtualTableITCase} in Flink 1.19. */
+public class Flink119BinlogVirtualTableITCase extends BinlogVirtualTableITCase
{}
diff --git
a/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120BinlogVirtualTableITCase.java
b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120BinlogVirtualTableITCase.java
new file mode 100644
index 000000000..354f8f198
--- /dev/null
+++
b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120BinlogVirtualTableITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+/** IT case for {@link BinlogVirtualTableITCase} in Flink 1.20. */
+public class Flink120BinlogVirtualTableITCase extends BinlogVirtualTableITCase
{}
diff --git
a/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22BinlogVirtualTableITCase.java
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22BinlogVirtualTableITCase.java
new file mode 100644
index 000000000..d73dad613
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/source/Flink22BinlogVirtualTableITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+/** IT case for {@link BinlogVirtualTableITCase} in Flink 2.2. */
+public class Flink22BinlogVirtualTableITCase extends BinlogVirtualTableITCase
{}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
index c62292c69..6fd5d147f 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java
@@ -245,7 +245,13 @@ public class FlinkConnectorOptions {
.withDescription(
"The serialized base64 bytes of refresh handler of
materialized table.");
- //
------------------------------------------------------------------------------------------
+ /** Internal option to indicate whether the base table is partitioned for
$binlog sources. */
+ public static final ConfigOption<Boolean> INTERNAL_BINLOG_IS_PARTITIONED =
+ ConfigOptions.key("_internal.binlog.is-partitioned")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Internal option: indicates whether the base table
is partitioned for $binlog virtual tables. Not part of public API.");
/** Startup mode for the fluss scanner, see {@link #SCAN_STARTUP_MODE}. */
public enum ScanStartupMode implements DescribedEnum {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index c6c434889..5d81c5e5e 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -23,6 +23,7 @@ import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.flink.adapter.CatalogTableAdapter;
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
import org.apache.fluss.flink.procedure.ProcedureManager;
@@ -39,6 +40,7 @@ import org.apache.fluss.utils.ExceptionUtils;
import org.apache.fluss.utils.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -73,6 +75,7 @@ import
org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.table.types.AbstractDataType;
import java.util.ArrayList;
import java.util.Collections;
@@ -315,11 +318,7 @@ public class FlinkCatalog extends AbstractCatalog {
return getVirtualChangelogTable(objectPath);
} else if (tableName.endsWith(BINLOG_TABLE_SUFFIX)
&& !tableName.contains(LAKE_TABLE_SPLITTER)) {
- // TODO: Implement binlog virtual table in future
- throw new UnsupportedOperationException(
- String.format(
- "$binlog virtual tables are not yet supported for
table %s",
- objectPath));
+ return getVirtualBinlogTable(objectPath);
}
TablePath tablePath = toTablePath(objectPath);
@@ -960,4 +959,109 @@ public class FlinkCatalog extends AbstractCatalog {
return builder.build();
}
+
+ /**
+ * Creates a virtual $binlog table by modifying the base table's schema to
include metadata
+ * columns and nested before/after ROW fields.
+ */
+ private CatalogBaseTable getVirtualBinlogTable(ObjectPath objectPath)
+ throws TableNotExistException, CatalogException {
+ // Extract the base table name (remove $binlog suffix)
+ String virtualTableName = objectPath.getObjectName();
+ String baseTableName =
+ virtualTableName.substring(
+ 0, virtualTableName.length() -
BINLOG_TABLE_SUFFIX.length());
+
+ // Get the base table
+ ObjectPath baseObjectPath = new
ObjectPath(objectPath.getDatabaseName(), baseTableName);
+ TablePath baseTablePath = toTablePath(baseObjectPath);
+
+ try {
+ // Retrieve base table info
+ TableInfo tableInfo = admin.getTableInfo(baseTablePath).get();
+
+ // $binlog is only supported for primary key tables
+ if (!tableInfo.hasPrimaryKey()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "$binlog virtual tables are only supported for
primary key tables. "
+ + "Table %s does not have a primary
key.",
+ baseTablePath));
+ }
+
+ // Convert to Flink table
+ CatalogBaseTable catalogBaseTable =
FlinkConversions.toFlinkTable(tableInfo);
+
+ if (!(catalogBaseTable instanceof CatalogTable)) {
+ throw new UnsupportedOperationException(
+ "Virtual $binlog tables are only supported for regular
tables");
+ }
+
+ CatalogTable baseTable = (CatalogTable) catalogBaseTable;
+
+ // Build the binlog schema with nested before/after ROW columns
+ Schema originalSchema = baseTable.getUnresolvedSchema();
+ Schema binlogSchema = buildBinlogSchema(originalSchema);
+
+ // Copy options from base table
+ Map<String, String> newOptions = new
HashMap<>(baseTable.getOptions());
+ newOptions.put(BOOTSTRAP_SERVERS.key(), bootstrapServers);
+ newOptions.putAll(securityConfigs);
+
+ // Store whether the base table is partitioned for the table
source to use.
+ // Since binlog schema has nested columns, we can't use Flink's
partition key mechanism.
+ newOptions.put(
+ FlinkConnectorOptions.INTERNAL_BINLOG_IS_PARTITIONED.key(),
+ String.valueOf(!baseTable.getPartitionKeys().isEmpty()));
+
+ // Create a new CatalogTable with the binlog schema
+ // Binlog virtual tables don't have partition keys at the top level
+ return CatalogTableAdapter.toCatalogTable(
+ binlogSchema, baseTable.getComment(),
Collections.emptyList(), newOptions);
+
+ } catch (Exception e) {
+ Throwable t = ExceptionUtils.stripExecutionException(e);
+ if (t instanceof UnsupportedOperationException) {
+ throw (UnsupportedOperationException) t;
+ }
+ if (isTableNotExist(t)) {
+ throw new TableNotExistException(getName(), baseObjectPath);
+ } else {
+ throw new CatalogException(
+ String.format(
+ "Failed to get virtual binlog table %s in %s",
+ objectPath, getName()),
+ t);
+ }
+ }
+ }
+
+ private Schema buildBinlogSchema(Schema originalSchema) {
+ Schema.Builder builder = Schema.newBuilder();
+
+ // Add metadata columns
+ builder.column("_change_type", STRING().notNull());
+ builder.column("_log_offset", BIGINT().notNull());
+ builder.column("_commit_timestamp", TIMESTAMP_LTZ(3).notNull());
+
+ // Build nested ROW type from original columns for before/after fields
+ // Using UnresolvedField since physCol.getDataType() returns
AbstractDataType (unresolved)
+ List<DataTypes.UnresolvedField> rowFields = new ArrayList<>();
+ for (Schema.UnresolvedColumn col : originalSchema.getColumns()) {
+ if (col instanceof Schema.UnresolvedPhysicalColumn) {
+ Schema.UnresolvedPhysicalColumn physCol =
(Schema.UnresolvedPhysicalColumn) col;
+ rowFields.add(DataTypes.FIELD(physCol.getName(),
physCol.getDataType()));
+ }
+ }
+ AbstractDataType<?> nestedRowType =
+ DataTypes.ROW(rowFields.toArray(new
DataTypes.UnresolvedField[0]));
+
+ // Add before and after as nullable nested ROW columns
+ builder.column("before", nestedRowType);
+ builder.column("after", nestedRowType);
+
+ // Note: We don't copy primary keys or watermarks for virtual tables
+
+ return builder.build();
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index 7c99b6f4b..a7a5608de 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -24,6 +24,7 @@ import org.apache.fluss.flink.lake.LakeFlinkCatalog;
import org.apache.fluss.flink.lake.LakeTableFactory;
import org.apache.fluss.flink.sink.FlinkTableSink;
import org.apache.fluss.flink.sink.shuffle.DistributionMode;
+import org.apache.fluss.flink.source.BinlogFlinkTableSource;
import org.apache.fluss.flink.source.ChangelogFlinkTableSource;
import org.apache.fluss.flink.source.FlinkTableSource;
import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
@@ -93,6 +94,11 @@ public class FlinkTableFactory implements
DynamicTableSourceFactory, DynamicTabl
return createChangelogTableSource(context, tableIdentifier,
tableName);
}
+ // Check if this is a $binlog suffix in table name
+ if (tableName.endsWith(FlinkCatalog.BINLOG_TABLE_SUFFIX)) {
+ return createBinlogTableSource(context, tableIdentifier,
tableName);
+ }
+
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig tableOptions = helper.getOptions();
validateSourceOptions(tableOptions);
@@ -327,4 +333,51 @@ public class FlinkTableFactory implements
DynamicTableSourceFactory, DynamicTabl
partitionDiscoveryIntervalMs,
catalogTableOptions);
}
+
+ /** Creates a BinlogFlinkTableSource for $binlog virtual tables. */
+ private DynamicTableSource createBinlogTableSource(
+ Context context, ObjectIdentifier tableIdentifier, String
tableName) {
+ // Extract the base table name by removing the $binlog suffix
+ String baseTableName =
+ tableName.substring(
+ 0, tableName.length() -
FlinkCatalog.BINLOG_TABLE_SUFFIX.length());
+
+ boolean isStreamingMode =
+ context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+ == RuntimeExecutionMode.STREAMING;
+
+ // tableOutputType: [_change_type, _log_offset, _commit_timestamp,
before ROW<...>, after
+ // ROW<...>]
+ RowType tableOutputType = (RowType)
context.getPhysicalRowDataType().getLogicalType();
+
+ Map<String, String> catalogTableOptions =
context.getCatalogTable().getOptions();
+ FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+ final ReadableConfig tableOptions = helper.getOptions();
+ validateSourceOptions(tableOptions);
+
+ ZoneId timeZone =
+ FlinkConnectorOptionsUtils.getLocalTimeZone(
+
context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE));
+ final FlinkConnectorOptionsUtils.StartupOptions startupOptions =
+ FlinkConnectorOptionsUtils.getStartupOptions(tableOptions,
timeZone);
+
+ // Check if the table is partitioned from the internal option
+ boolean isPartitioned =
+
tableOptions.get(FlinkConnectorOptions.INTERNAL_BINLOG_IS_PARTITIONED);
+
+ long partitionDiscoveryIntervalMs =
+ tableOptions
+
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
+ .toMillis();
+
+ return new BinlogFlinkTableSource(
+ TablePath.of(tableIdentifier.getDatabaseName(), baseTableName),
+ toFlussClientConfig(catalogTableOptions,
context.getConfiguration()),
+ tableOutputType,
+ isPartitioned,
+ isStreamingMode,
+ startupOptions,
+ partitionDiscoveryIntervalMs,
+ catalogTableOptions);
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java
similarity index 52%
copy from
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
copy to
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java
index 3883c4308..a5ec91bf3 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java
@@ -19,15 +19,13 @@ package org.apache.fluss.flink.source;
import org.apache.fluss.client.initializer.OffsetsInitializer;
import org.apache.fluss.config.Configuration;
-import
org.apache.fluss.flink.source.deserializer.ChangelogDeserializationSchema;
+import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema;
import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
import org.apache.fluss.flink.utils.FlinkConversions;
-import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.types.RowType;
-import org.apache.flink.api.connector.source.Source;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
@@ -37,23 +35,18 @@ import org.apache.flink.table.types.logical.LogicalType;
import javax.annotation.Nullable;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-/** A Flink table source for the $changelog virtual table. */
-public class ChangelogFlinkTableSource implements ScanTableSource {
+/** A Flink table source for the $binlog virtual table. */
+public class BinlogFlinkTableSource implements ScanTableSource {
private final TablePath tablePath;
private final Configuration flussConfig;
- // The changelog output type (includes metadata columns: _change_type,
_log_offset,
- // _commit_timestamp)
- private final org.apache.flink.table.types.logical.RowType
changelogOutputType;
+ // The binlog output type (includes metadata + nested before/after ROW
columns)
+ private final org.apache.flink.table.types.logical.RowType
binlogOutputType;
+ // The data columns type extracted from the 'before' nested ROW
private final org.apache.flink.table.types.logical.RowType dataColumnsType;
- private final int[] partitionKeyIndexes;
+ private final boolean isPartitioned;
private final boolean streaming;
private final FlinkConnectorOptionsUtils.StartupOptions startupOptions;
private final long scanPartitionDiscoveryIntervalMs;
@@ -65,77 +58,50 @@ public class ChangelogFlinkTableSource implements
ScanTableSource {
@Nullable private Predicate partitionFilters;
- private static final Set<String> METADATA_COLUMN_NAMES =
- new HashSet<>(
- Arrays.asList(
- TableDescriptor.CHANGE_TYPE_COLUMN,
- TableDescriptor.LOG_OFFSET_COLUMN,
- TableDescriptor.COMMIT_TIMESTAMP_COLUMN));
-
- public ChangelogFlinkTableSource(
+ public BinlogFlinkTableSource(
TablePath tablePath,
Configuration flussConfig,
- org.apache.flink.table.types.logical.RowType changelogOutputType,
- int[] partitionKeyIndexes,
+ org.apache.flink.table.types.logical.RowType binlogOutputType,
+ boolean isPartitioned,
boolean streaming,
FlinkConnectorOptionsUtils.StartupOptions startupOptions,
long scanPartitionDiscoveryIntervalMs,
Map<String, String> tableOptions) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
- // The changelogOutputType already includes metadata columns from
FlinkCatalog
- this.changelogOutputType = changelogOutputType;
- this.partitionKeyIndexes = partitionKeyIndexes;
+ this.binlogOutputType = binlogOutputType;
+ this.isPartitioned = isPartitioned;
this.streaming = streaming;
this.startupOptions = startupOptions;
this.scanPartitionDiscoveryIntervalMs =
scanPartitionDiscoveryIntervalMs;
this.tableOptions = tableOptions;
- // Extract data columns by filtering out metadata columns by name
- this.dataColumnsType = extractDataColumnsType(changelogOutputType);
- this.producedDataType = changelogOutputType;
- }
-
- /**
- * Extracts the data columns type by removing the metadata columns from
the changelog output
- * type.
- */
- private org.apache.flink.table.types.logical.RowType
extractDataColumnsType(
- org.apache.flink.table.types.logical.RowType changelogType) {
- // Filter out metadata columns by name
- List<org.apache.flink.table.types.logical.RowType.RowField> dataFields
=
- changelogType.getFields().stream()
- .filter(field ->
!METADATA_COLUMN_NAMES.contains(field.getName()))
- .collect(Collectors.toList());
-
- return new org.apache.flink.table.types.logical.RowType(dataFields);
+ // Extract data columns from the 'before' nested ROW type (index 3)
+ // The binlog schema is: [_change_type, _log_offset,
_commit_timestamp, before, after]
+ this.dataColumnsType =
+ (org.apache.flink.table.types.logical.RowType)
binlogOutputType.getTypeAt(3);
+ this.producedDataType = binlogOutputType;
}
@Override
public ChangelogMode getChangelogMode() {
- // The $changelog virtual table always produces INSERT-only records.
- // All change types (+I, -U, +U, -D) are flattened into regular rows
return ChangelogMode.insertOnly();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext)
{
- // Create the Fluss row type for the data columns (without metadata)
+ // Create the Fluss row type for the data columns (the original table
columns)
RowType flussRowType =
FlinkConversions.toFlussRowType(dataColumnsType);
if (projectedFields != null) {
- // Adjust projection to account for metadata columns
- // TODO: Handle projection properly with metadata columns
flussRowType = flussRowType.project(projectedFields);
}
- // to capture all change types (+I, -U, +U, -D).
- // FULL mode reads snapshot first (no change types), so we use
EARLIEST for log-only
- // reading.
- // LATEST mode is supported for real-time changelog streaming from
current position.
+
+ // Determine the offsets initializer based on startup mode
OffsetsInitializer offsetsInitializer;
switch (startupOptions.startupMode) {
case EARLIEST:
case FULL:
- // For changelog, FULL mode should read all log records from
beginning
+ // For binlog, read all log records from the beginning
offsetsInitializer = OffsetsInitializer.earliest();
break;
case LATEST:
@@ -150,52 +116,33 @@ public class ChangelogFlinkTableSource implements
ScanTableSource {
"Unsupported startup mode: " +
startupOptions.startupMode);
}
- // Create the source with the changelog deserialization schema
+ // Create the source with the binlog deserialization schema
FlinkSource<RowData> source =
new FlinkSource<>(
flussConfig,
tablePath,
- // Changelog/binlog virtual tables are purely
log-based and don't have a
- // primary key, setting hasPrimaryKey "false" to
ensure the enumerator
- // fetches log-only splits (not snapshot splits),
which is the correct
- // behavior for virtual tables.
false,
- isPartitioned(),
+ isPartitioned,
flussRowType,
projectedFields,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
- new ChangelogDeserializationSchema(),
+ new BinlogDeserializationSchema(),
streaming,
partitionFilters,
- null); // Lake source not supported
-
- if (!streaming) {
- // Batch mode - changelog virtual tables read from log, not data
lake
- return new SourceProvider() {
- @Override
- public boolean isBounded() {
- return true;
- }
-
- @Override
- public Source<RowData, ?, ?> createSource() {
- return source;
- }
- };
- } else {
- return SourceProvider.of(source);
- }
+ null);
+
+ return SourceProvider.of(source);
}
@Override
public DynamicTableSource copy() {
- ChangelogFlinkTableSource copy =
- new ChangelogFlinkTableSource(
+ BinlogFlinkTableSource copy =
+ new BinlogFlinkTableSource(
tablePath,
flussConfig,
- changelogOutputType,
- partitionKeyIndexes,
+ binlogOutputType,
+ isPartitioned,
streaming,
startupOptions,
scanPartitionDiscoveryIntervalMs,
@@ -208,13 +155,9 @@ public class ChangelogFlinkTableSource implements
ScanTableSource {
@Override
public String asSummaryString() {
- return "FlussChangelogTableSource";
+ return "FlussBinlogTableSource";
}
- // TODO: Implement projection pushdown handling for metadata columns
+ // TODO: Implement projection pushdown handling for nested before/after
columns
// TODO: Implement filter pushdown
-
- private boolean isPartitioned() {
- return partitionKeyIndexes.length > 0;
- }
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
index 3883c4308..a034807d7 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
@@ -27,7 +27,6 @@ import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.types.RowType;
-import org.apache.flink.api.connector.source.Source;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
@@ -170,22 +169,7 @@ public class ChangelogFlinkTableSource implements
ScanTableSource {
partitionFilters,
null); // Lake source not supported
- if (!streaming) {
- // Batch mode - changelog virtual tables read from log, not data
lake
- return new SourceProvider() {
- @Override
- public boolean isBounded() {
- return true;
- }
-
- @Override
- public Source<RowData, ?, ?> createSource() {
- return source;
- }
- };
- } else {
- return SourceProvider.of(source);
- }
+ return SourceProvider.of(source);
}
@Override
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java
new file mode 100644
index 000000000..1114febb9
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/deserializer/BinlogDeserializationSchema.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.deserializer;
+
+import org.apache.fluss.flink.utils.BinlogRowConverter;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import javax.annotation.Nullable;
+
+import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowType;
+
+/**
+ * A deserialization schema that converts {@link LogRecord} objects to Flink's
{@link RowData}
+ * format with nested before/after row structure for the $binlog virtual table.
+ *
+ * <p>This schema is stateful: it buffers UPDATE_BEFORE (-U) records and
returns {@code null} for
+ * them. When the subsequent UPDATE_AFTER (+U) record arrives, it merges both
into a single binlog
+ * row. The {@link org.apache.fluss.flink.source.emitter.FlinkRecordEmitter}
handles null returns by
+ * skipping emission.
+ */
+public class BinlogDeserializationSchema implements
FlussDeserializationSchema<RowData> {
+
+ /**
+ * Converter responsible for transforming Fluss row data into Flink's
{@link RowData} format
+ * with nested before/after structure. Initialized during {@link
#open(InitializationContext)}.
+ */
+ private transient BinlogRowConverter converter;
+
+ /** Creates a new BinlogDeserializationSchema. */
+ public BinlogDeserializationSchema() {}
+
+ /** Initializes the deserialization schema. */
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ if (converter == null) {
+ this.converter = new BinlogRowConverter(context.getRowSchema());
+ }
+ }
+
+ /**
+ * Deserializes a {@link LogRecord} into a Flink {@link RowData} object
with nested before/after
+ * structure.
+ */
+ @Override
+ @Nullable
+ public RowData deserialize(LogRecord record) throws Exception {
+ if (converter == null) {
+ throw new IllegalStateException(
+ "Converter not initialized. The open() method must be
called before deserializing records.");
+ }
+ return converter.toBinlogRowData(record);
+ }
+
+ /**
+ * Returns the TypeInformation for the produced {@link RowData} type
including nested
+ * before/after ROW columns.
+ */
+ @Override
+ public TypeInformation<RowData> getProducedType(RowType rowSchema) {
+ // Build the output type with nested before/after ROW columns
+ org.apache.flink.table.types.logical.RowType outputType =
+
BinlogRowConverter.buildBinlogRowType(toFlinkRowType(rowSchema));
+ return InternalTypeInfo.of(outputType);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java
index 82a3ed87e..ba4c9d013 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java
@@ -73,8 +73,20 @@ public class FlinkRecordEmitter<OUT> implements
RecordEmitter<RecordAndPos, OUT,
}
processAndEmitRecord(scanRecord, sourceOutput);
} else if (splitState.isLogSplitState()) {
-
splitState.asLogSplitState().setNextOffset(recordAndPosition.record().logOffset()
+ 1);
- processAndEmitRecord(recordAndPosition.record(), sourceOutput);
+ // Attempt to process and emit the record.
+ // For $binlog, this returns true only when a complete row (or the
final part of
+ // a split) is emitted.
+ boolean emitted = processAndEmitRecord(recordAndPosition.record(),
sourceOutput);
+
+ if (emitted) {
+ // Only advance the offset in state if the record was
successfully emitted.
+ // This ensures that if a crash occurs mid-update (between
BEFORE and AFTER),
+ // the source will re-read the same log offset upon recovery,
+ // allowing the BinlogDeserializationSchema to correctly
reconstruct the state.
+ splitState
+ .asLogSplitState()
+ .setNextOffset(recordAndPosition.record().logOffset()
+ 1);
+ }
} else if (splitState.isLakeSplit()) {
if (lakeRecordRecordEmitter == null) {
lakeRecordRecordEmitter = new
LakeRecordRecordEmitter<>(this::processAndEmitRecord);
@@ -85,7 +97,13 @@ public class FlinkRecordEmitter<OUT> implements
RecordEmitter<RecordAndPos, OUT,
}
}
- private void processAndEmitRecord(ScanRecord scanRecord, SourceOutput<OUT>
sourceOutput) {
+ /**
+ * Processes and emits a record.
+ *
+ * @return true if a record was emitted, false if deserialize returned
null (e.g., for $binlog
+ * UPDATE_BEFORE records that are buffered pending their UPDATE_AFTER
pair)
+ */
+ private boolean processAndEmitRecord(ScanRecord scanRecord,
SourceOutput<OUT> sourceOutput) {
OUT record;
try {
record = deserializationSchema.deserialize(scanRecord);
@@ -102,6 +120,8 @@ public class FlinkRecordEmitter<OUT> implements
RecordEmitter<RecordAndPos, OUT,
} else {
sourceOutput.collect(record);
}
+ return true;
}
+ return false;
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java
new file mode 100644
index 000000000..3a9f2c941
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/BinlogRowConverter.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A converter that transforms Fluss's {@link LogRecord} to Flink's {@link
RowData} with nested
+ * before/after row structure for the $binlog virtual table.
+ */
+public class BinlogRowConverter implements RecordToFlinkRowConverter {
+
+ private final FlussRowToFlinkRowConverter baseConverter;
+ private final org.apache.flink.table.types.logical.RowType producedType;
+
+ /**
+ * Buffer for the UPDATE_BEFORE (-U) record pending merge with the next
UPDATE_AFTER (+U)
+ * record. Null when no update is in progress.
+ */
+ @Nullable private LogRecord pendingUpdateBefore;
+
+ /** Creates a new BinlogRowConverter. */
+ public BinlogRowConverter(RowType rowType) {
+ this.baseConverter = new FlussRowToFlinkRowConverter(rowType);
+ this.producedType =
buildBinlogRowType(FlinkConversions.toFlinkRowType(rowType));
+ }
+
+ /** Converts a LogRecord to a binlog RowData with nested before/after
structure. */
+ @Nullable
+ public RowData toBinlogRowData(LogRecord record) {
+ ChangeType changeType = record.getChangeType();
+
+ switch (changeType) {
+ case INSERT:
+ return buildBinlogRow(
+ "insert",
+ record.logOffset(),
+ record.timestamp(),
+ null,
+ baseConverter.toFlinkRowData(record.getRow()));
+
+ case UPDATE_BEFORE:
+ // Buffer the -U record and return null.
+ // FlinkRecordEmitter.processAndEmitRecord() skips null
results.
+ this.pendingUpdateBefore = record;
+ return null;
+
+ case UPDATE_AFTER:
+ // Merge with the buffered -U record
+ if (pendingUpdateBefore == null) {
+ throw new IllegalStateException(
+ "Received UPDATE_AFTER (+U) without a preceding
UPDATE_BEFORE (-U) record. "
+ + "This indicates a corrupted log
sequence.");
+ }
+ RowData beforeRow =
baseConverter.toFlinkRowData(pendingUpdateBefore.getRow());
+ RowData afterRow =
baseConverter.toFlinkRowData(record.getRow());
+ // Use offset and timestamp from the -U record (first entry of
update pair)
+ long offset = pendingUpdateBefore.logOffset();
+ long timestamp = pendingUpdateBefore.timestamp();
+ pendingUpdateBefore = null;
+ return buildBinlogRow("update", offset, timestamp, beforeRow,
afterRow);
+
+ case DELETE:
+ return buildBinlogRow(
+ "delete",
+ record.logOffset(),
+ record.timestamp(),
+ baseConverter.toFlinkRowData(record.getRow()),
+ null);
+
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "$binlog virtual table does not support change
type: %s. "
+ + "$binlog is only supported for
primary key tables.",
+ changeType));
+ }
+ }
+
+ @Override
+ @Nullable
+ public RowData convert(LogRecord record) {
+ return toBinlogRowData(record);
+ }
+
+ @Override
+ public org.apache.flink.table.types.logical.RowType getProducedType() {
+ return producedType;
+ }
+
+ /**
+ * Builds a binlog row with 5 fields: _change_type, _log_offset,
_commit_timestamp, before,
+ * after.
+ */
+ private RowData buildBinlogRow(
+ String changeType,
+ long offset,
+ long timestamp,
+ @Nullable RowData before,
+ @Nullable RowData after) {
+ GenericRowData row = new GenericRowData(5);
+ row.setField(0, StringData.fromString(changeType));
+ row.setField(1, offset);
+ row.setField(2, TimestampData.fromEpochMillis(timestamp));
+ row.setField(3, before);
+ row.setField(4, after);
+ row.setRowKind(RowKind.INSERT);
+ return row;
+ }
+
+ /**
+ * Builds the Flink RowType for the binlog virtual table with nested
before/after ROW columns.
+ */
+ public static org.apache.flink.table.types.logical.RowType
buildBinlogRowType(
+ org.apache.flink.table.types.logical.RowType originalType) {
+ List<org.apache.flink.table.types.logical.RowType.RowField> fields =
new ArrayList<>();
+
+ // Add metadata columns
+ fields.add(
+ new org.apache.flink.table.types.logical.RowType.RowField(
+ TableDescriptor.CHANGE_TYPE_COLUMN, new
VarCharType(false, 6)));
+ fields.add(
+ new org.apache.flink.table.types.logical.RowType.RowField(
+ TableDescriptor.LOG_OFFSET_COLUMN, new
BigIntType(false)));
+ fields.add(
+ new org.apache.flink.table.types.logical.RowType.RowField(
+ TableDescriptor.COMMIT_TIMESTAMP_COLUMN,
+ new LocalZonedTimestampType(false, 3)));
+
+ // Add nested before and after ROW columns (nullable at the ROW level)
+ org.apache.flink.table.types.logical.RowType nullableRowType =
+ new org.apache.flink.table.types.logical.RowType(true,
originalType.getFields());
+ fields.add(
+ new org.apache.flink.table.types.logical.RowType.RowField(
+ TableDescriptor.BEFORE_COLUMN, nullableRowType));
+ fields.add(
+ new org.apache.flink.table.types.logical.RowType.RowField(
+ TableDescriptor.AFTER_COLUMN, nullableRowType));
+
+ return new org.apache.flink.table.types.logical.RowType(fields);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java
index 6b8081a6d..a2b339da5 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/ChangelogRowConverter.java
@@ -86,8 +86,19 @@ public class ChangelogRowConverter implements
RecordToFlinkRowConverter {
/** Converts a Fluss ChangeType to its string representation for the
changelog virtual table. */
private String convertChangeTypeToString(ChangeType changeType) {
- // Use the short string representation from ChangeType
- return changeType.shortString();
+ switch (changeType) {
+ case APPEND_ONLY:
+ case INSERT:
+ return "insert";
+ case UPDATE_BEFORE:
+ return "update_before";
+ case UPDATE_AFTER:
+ return "update_after";
+ case DELETE:
+ return "delete";
+ default:
+ throw new IllegalArgumentException("Unknown change type: " +
changeType);
+ }
}
/**
@@ -103,7 +114,7 @@ public class ChangelogRowConverter implements
RecordToFlinkRowConverter {
// Add metadata columns first (using centralized constants from
TableDescriptor)
fields.add(
new org.apache.flink.table.types.logical.RowType.RowField(
- TableDescriptor.CHANGE_TYPE_COLUMN, new
VarCharType(false, 2)));
+ TableDescriptor.CHANGE_TYPE_COLUMN, new
VarCharType(false, 13)));
fields.add(
new org.apache.flink.table.types.logical.RowType.RowField(
TableDescriptor.LOG_OFFSET_COLUMN, new
BigIntType(false)));
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index 58e78ccd2..f4ffd61de 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -27,6 +27,7 @@ import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
@@ -914,7 +915,7 @@ abstract class FlinkCatalogITCase {
// Verify options are inherited from base table
assertThat(changelogTable.getOptions()).containsEntry("bucket.num",
"1");
- // Verify $changelog log tables (append-only with +A change type)
+ // Verify $changelog log tables (append-only with insert change type)
tEnv.executeSql("CREATE TABLE log_table_for_changelog (id INT, name
STRING)");
CatalogTable logChangelogTable =
@@ -935,6 +936,55 @@ abstract class FlinkCatalogITCase {
assertThat(logChangelogTable.getUnresolvedSchema()).isEqualTo(expectedLogSchema);
}
+ @Test
+ void testGetBinlogVirtualTable() throws Exception {
+ // Create a primary key table with partition
+ tEnv.executeSql(
+ "CREATE TABLE pk_table_for_binlog ("
+ + " id INT NOT NULL,"
+ + " name STRING NOT NULL,"
+ + " amount BIGINT,"
+ + " PRIMARY KEY (id, name) NOT ENFORCED"
+ + ") PARTITIONED BY (name) "
+ + "WITH ('bucket.num' = '1')");
+
+ // Get the $binlog virtual table via catalog API
+ CatalogTable binlogTable =
+ (CatalogTable)
+ catalog.getTable(new ObjectPath(DEFAULT_DB,
"pk_table_for_binlog$binlog"));
+
+ // use string representation for assertion to simplify the unresolved
schema comparison
+ assertThat(binlogTable.getUnresolvedSchema().toString())
+ .isEqualTo(
+ "(\n"
+ + " `_change_type` STRING NOT NULL,\n"
+ + " `_log_offset` BIGINT NOT NULL,\n"
+ + " `_commit_timestamp` TIMESTAMP_LTZ(3) NOT
NULL,\n"
+ + " `before` [ROW<id INT NOT NULL, name
STRING NOT NULL, amount BIGINT>],\n"
+ + " `after` [ROW<id INT NOT NULL, name STRING
NOT NULL, amount BIGINT>]\n"
+ + ")");
+
+ // Binlog virtual tables have empty partition keys (columns are nested)
+ assertThat(binlogTable.getPartitionKeys()).isEmpty();
+
+ // Partition info is stored as an internal boolean flag
+ assertThat(binlogTable.getOptions())
+
.containsEntry(FlinkConnectorOptions.INTERNAL_BINLOG_IS_PARTITIONED.key(),
"true");
+
+ // Verify options are inherited from base table
+ assertThat(binlogTable.getOptions()).containsEntry("bucket.num", "1");
+
+ // Verify $binlog is NOT supported for log tables (no primary key)
+ tEnv.executeSql("CREATE TABLE log_table_for_binlog (id INT, name
STRING)");
+
+ assertThatThrownBy(
+ () ->
+ catalog.getTable(
+ new ObjectPath(DEFAULT_DB,
"log_table_for_binlog$binlog")))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("only supported for primary key tables");
+ }
+
/**
* Before Flink 2.1, the {@link Schema} did not include an index field.
Starting from Flink 2.1,
* Flink introduced the concept of an index, and in Fluss, the primary key
is considered as an
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java
new file mode 100644
index 000000000..2fa306f88
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.utils.clock.ManualClock;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
+import static
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Integration test for $binlog virtual table functionality. */
+abstract class BinlogVirtualTableITCase extends AbstractTestBase {
+
+ protected static final ManualClock CLOCK = new ManualClock();
+
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder()
+ .setClusterConf(new Configuration())
+ .setNumOfTabletServers(1)
+ .setClock(CLOCK)
+ .build();
+
+ static final String CATALOG_NAME = "testcatalog";
+ static final String DEFAULT_DB = "test_binlog_db";
+ protected StreamExecutionEnvironment execEnv;
+ protected StreamTableEnvironment tEnv;
+ protected static Connection conn;
+ protected static Admin admin;
+
+ protected static Configuration clientConf;
+
+ @BeforeAll
+ protected static void beforeAll() {
+ clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+ conn = ConnectionFactory.createConnection(clientConf);
+ admin = conn.getAdmin();
+ }
+
+ @BeforeEach
+ void before() {
+ // Initialize Flink environment
+ execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ tEnv = StreamTableEnvironment.create(execEnv,
EnvironmentSettings.inStreamingMode());
+
+ // Initialize catalog and database
+ String bootstrapServers = String.join(",",
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+ tEnv.executeSql(
+ String.format(
+ "create catalog %s with ('type' = 'fluss', '%s' =
'%s')",
+ CATALOG_NAME, BOOTSTRAP_SERVERS.key(),
bootstrapServers));
+ tEnv.executeSql("use catalog " + CATALOG_NAME);
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
1);
+ tEnv.executeSql("create database " + DEFAULT_DB);
+ tEnv.useDatabase(DEFAULT_DB);
+ // reset clock before each test
+ CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS);
+ }
+
+ @AfterEach
+ void after() {
+ tEnv.useDatabase(BUILTIN_DATABASE);
+ tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
+ }
+
+ /** Deletes rows from a primary key table using the proper delete API. */
+ protected static void deleteRows(
+ Connection connection, TablePath tablePath, List<InternalRow>
rows) throws Exception {
+ try (Table table = connection.getTable(tablePath)) {
+ UpsertWriter writer = table.newUpsert().createWriter();
+ for (InternalRow row : rows) {
+ writer.delete(row);
+ }
+ writer.flush();
+ }
+ }
+
+ @Test
+ public void testDescribeBinlogTable() throws Exception {
+ // Create a table with various data types to test complex schema
+ tEnv.executeSql(
+ "CREATE TABLE describe_test ("
+ + " id INT NOT NULL,"
+ + " name STRING,"
+ + " amount BIGINT,"
+ + " PRIMARY KEY (id) NOT ENFORCED"
+ + ")");
+
+ // Test DESCRIBE on binlog virtual table
+ CloseableIterator<Row> describeResult =
+ tEnv.executeSql("DESCRIBE describe_test$binlog").collect();
+
+ List<String> schemaRows = new ArrayList<>();
+ while (describeResult.hasNext()) {
+ schemaRows.add(describeResult.next().toString());
+ }
+
+ // Should have 5 columns: _change_type, _log_offset,
_commit_timestamp, before, after
+ assertThat(schemaRows).hasSize(5);
+
+ // Verify metadata columns are listed first
+ assertThat(schemaRows.get(0))
+ .isEqualTo("+I[_change_type, STRING, false, null, null,
null]");
+ assertThat(schemaRows.get(1)).isEqualTo("+I[_log_offset, BIGINT,
false, null, null, null]");
+ assertThat(schemaRows.get(2))
+ .isEqualTo("+I[_commit_timestamp, TIMESTAMP_LTZ(3), false,
null, null, null]");
+
+ // Verify before and after are ROW types with original columns
+ assertThat(schemaRows.get(3))
+ .isEqualTo(
+ "+I[before, ROW<`id` INT NOT NULL, `name` STRING,
`amount` BIGINT>, true, null, null, null]");
+ assertThat(schemaRows.get(4))
+ .isEqualTo(
+ "+I[after, ROW<`id` INT NOT NULL, `name` STRING,
`amount` BIGINT>, true, null, null, null]");
+ }
+
+ @Test
+ public void testBinlogUnsupportedForLogTable() throws Exception {
+ // Create a log table (no primary key)
+ tEnv.executeSql(
+ "CREATE TABLE log_table ("
+ + " event_id INT,"
+ + " event_type STRING"
+ + ") WITH ('bucket.num' = '1')");
+
+ // $binlog should fail for log tables
+ assertThatThrownBy(() -> tEnv.executeSql("DESCRIBE
log_table$binlog").collect())
+ .hasMessageContaining("only supported for primary key tables");
+ }
+
+ @Test
+ public void testBinlogWithAllChangeTypes() throws Exception {
+ // Create a primary key table with 1 bucket for consistent log_offset
numbers
+ tEnv.executeSql(
+ "CREATE TABLE binlog_test ("
+ + " id INT NOT NULL,"
+ + " name STRING,"
+ + " amount BIGINT,"
+ + " PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ('bucket.num' = '1')");
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, "binlog_test");
+
+ // Start binlog scan
+ String query =
+ "SELECT _change_type, _log_offset, "
+ + "before.id, before.name, before.amount, "
+ + "after.id, after.name, after.amount "
+ + "FROM binlog_test$binlog";
+ CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+ // Test INSERT
+ CLOCK.advanceTime(Duration.ofMillis(1000));
+ writeRows(
+ conn,
+ tablePath,
+ Arrays.asList(row(1, "Item-1", 100L), row(2, "Item-2", 200L)),
+ false);
+
+ // Collect inserts - each INSERT produces one binlog row
+ List<String> insertResults = collectRowsWithTimeout(rowIter, 2, false);
+ assertThat(insertResults).hasSize(2);
+
+ // INSERT: before=null, after=row data
+ // Format: +I[_change_type, _log_offset, before.id, before.name,
before.amount,
+ // after.id, after.name, after.amount]
+ assertThat(insertResults.get(0))
+ .isEqualTo("+I[insert, 0, null, null, null, 1, Item-1, 100]");
+ assertThat(insertResults.get(1))
+ .isEqualTo("+I[insert, 1, null, null, null, 2, Item-2, 200]");
+
+ // Test UPDATE - should merge -U and +U into single binlog row
+ CLOCK.advanceTime(Duration.ofMillis(1000));
+ writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1-Updated",
150L)), false);
+
+ // UPDATE produces ONE binlog row (not two like changelog)
+ List<String> updateResults = collectRowsWithTimeout(rowIter, 1, false);
+ assertThat(updateResults).hasSize(1);
+
+ // UPDATE: before=old row, after=new row, offset=from -U record
+ assertThat(updateResults.get(0))
+ .isEqualTo("+I[update, 2, 1, Item-1, 100, 1, Item-1-Updated,
150]");
+
+ // Test DELETE
+ CLOCK.advanceTime(Duration.ofMillis(1000));
+ deleteRows(conn, tablePath, Arrays.asList(row(2, "Item-2", 200L)));
+
+ // DELETE produces one binlog row
+ List<String> deleteResults = collectRowsWithTimeout(rowIter, 1, true);
+ assertThat(deleteResults).hasSize(1);
+
+ // DELETE: before=row data, after=null
+ assertThat(deleteResults.get(0))
+ .isEqualTo("+I[delete, 4, 2, Item-2, 200, null, null, null]");
+ }
+
+ @Test
+ public void testBinlogSelectStar() throws Exception {
+ // Test SELECT * which returns the full binlog structure
+ tEnv.executeSql(
+ "CREATE TABLE star_test ("
+ + " id INT NOT NULL,"
+ + " name STRING,"
+ + " PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ('bucket.num' = '1')");
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, "star_test");
+
+ String query = "SELECT * FROM star_test$binlog";
+ CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+ // Insert a row
+ CLOCK.advanceTime(Duration.ofMillis(1000));
+ writeRows(conn, tablePath, Arrays.asList(row(1, "Alice")), false);
+
+ List<String> results = collectRowsWithTimeout(rowIter, 1, true);
+ assertThat(results).hasSize(1);
+
+ // SELECT * returns: _change_type, _log_offset, _commit_timestamp,
before, after
+ // before is null for INSERT, after contains the row
+ assertThat(results.get(0))
+ .isEqualTo("+I[insert, 0, 1970-01-01T00:00:01Z, null, +I[1,
Alice]]");
+ }
+
+ @Test
+ public void testBinlogWithPartitionedTable() throws Exception {
+ // Create a partitioned primary key table
+ tEnv.executeSql(
+ "CREATE TABLE partitioned_binlog_test ("
+ + " id INT NOT NULL,"
+ + " name STRING,"
+ + " region STRING NOT NULL,"
+ + " PRIMARY KEY (id, region) NOT ENFORCED"
+ + ") PARTITIONED BY (region) WITH ('bucket.num' =
'1')");
+
+ // Insert data into different partitions using Flink SQL
+ CLOCK.advanceTime(Duration.ofMillis(100));
+ tEnv.executeSql(
+ "INSERT INTO partitioned_binlog_test VALUES "
+ + "(1, 'Item-1', 'us'), "
+ + "(2, 'Item-2', 'eu')")
+ .await();
+
+ // Query binlog with nested field access
+ String query =
+ "SELECT _change_type, after.id, after.name, after.region "
+ + "FROM partitioned_binlog_test$binlog";
+ CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
+
+ List<String> results = collectRowsWithTimeout(rowIter, 2, false);
+ // Sort results for deterministic assertion (partitions may return in
any order)
+ Collections.sort(results);
+ assertThat(results)
+ .isEqualTo(Arrays.asList("+I[insert, 1, Item-1, us]",
"+I[insert, 2, Item-2, eu]"));
+
+ // Update a record in a specific partition
+ CLOCK.advanceTime(Duration.ofMillis(100));
+ tEnv.executeSql("INSERT INTO partitioned_binlog_test VALUES (1,
'Item-1-Updated', 'us')")
+ .await();
+
+ List<String> updateResults = collectRowsWithTimeout(rowIter, 1, true);
+ assertThat(updateResults).hasSize(1);
+ assertThat(updateResults.get(0)).isEqualTo("+I[update, 1,
Item-1-Updated, us]");
+ }
+
+ @Test
+ public void testBinlogScanStartupMode() throws Exception {
+ // Create a primary key table with 1 bucket
+ tEnv.executeSql(
+ "CREATE TABLE startup_binlog_test ("
+ + " id INT NOT NULL,"
+ + " name STRING,"
+ + " PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ('bucket.num' = '1')");
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, "startup_binlog_test");
+
+ // Write first batch
+ CLOCK.advanceTime(Duration.ofMillis(100));
+ writeRows(conn, tablePath, Arrays.asList(row(1, "v1"), row(2, "v2"),
row(3, "v3")), false);
+
+ // Write second batch
+ CLOCK.advanceTime(Duration.ofMillis(100));
+ writeRows(conn, tablePath, Arrays.asList(row(4, "v4"), row(5, "v5")),
false);
+
+ // Test scan.startup.mode='earliest' - should read all records from
beginning
+ String optionsEarliest = " /*+ OPTIONS('scan.startup.mode' =
'earliest') */";
+ String queryEarliest =
+ "SELECT _change_type, after.id, after.name FROM
startup_binlog_test$binlog"
+ + optionsEarliest;
+ CloseableIterator<Row> rowIterEarliest =
tEnv.executeSql(queryEarliest).collect();
+ List<String> earliestResults = collectRowsWithTimeout(rowIterEarliest,
5, true);
+ assertThat(earliestResults)
+ .isEqualTo(
+ Arrays.asList(
+ "+I[insert, 1, v1]",
+ "+I[insert, 2, v2]",
+ "+I[insert, 3, v3]",
+ "+I[insert, 4, v4]",
+ "+I[insert, 5, v5]"));
+
+ // Test scan.startup.mode='timestamp' - should read from specific
timestamp
+ String optionsTimestamp =
+ " /*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' = '150') */";
+ String queryTimestamp =
+ "SELECT _change_type, after.id, after.name FROM
startup_binlog_test$binlog"
+ + optionsTimestamp;
+ CloseableIterator<Row> rowIterTimestamp =
tEnv.executeSql(queryTimestamp).collect();
+ List<String> timestampResults =
collectRowsWithTimeout(rowIterTimestamp, 2, true);
+ assertThat(timestampResults)
+ .isEqualTo(Arrays.asList("+I[insert, 4, v4]", "+I[insert, 5,
v5]"));
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
index 2b9111af8..342772996 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
@@ -97,7 +97,7 @@ abstract class ChangelogVirtualTableITCase extends
AbstractTestBase {
"create catalog %s with ('type' = 'fluss', '%s' =
'%s')",
CATALOG_NAME, BOOTSTRAP_SERVERS.key(),
bootstrapServers));
tEnv.executeSql("use catalog " + CATALOG_NAME);
-
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
1);
tEnv.executeSql("create database " + DEFAULT_DB);
tEnv.useDatabase(DEFAULT_DB);
// reset clock before each test
@@ -220,16 +220,17 @@ abstract class ChangelogVirtualTableITCase extends
AbstractTestBase {
assertThat(results).hasSize(2);
// Format: +I[_change_type, _log_offset, _commit_timestamp, event_id,
event_type]
- // Log tables use +A (append-only) change type
- assertThat(results.get(0)).isEqualTo("+I[+A, 0, 1970-01-01T00:00:01Z,
1, click]");
- assertThat(results.get(1)).isEqualTo("+I[+A, 1, 1970-01-01T00:00:01Z,
2, view]");
+ // Log tables use insert (append-only) change type
+ assertThat(results.get(0)).isEqualTo("+I[insert, 0,
1970-01-01T00:00:01Z, 1, click]");
+ assertThat(results.get(1)).isEqualTo("+I[insert, 1,
1970-01-01T00:00:01Z, 2, view]");
// Insert more data with new timestamp
CLOCK.advanceTime(Duration.ofMillis(1000));
writeRows(conn, tablePath, Arrays.asList(row(3, "purchase")), true);
List<String> moreResults = collectRowsWithTimeout(rowIter, 1, true);
- assertThat(moreResults.get(0)).isEqualTo("+I[+A, 2,
1970-01-01T00:00:02Z, 3, purchase]");
+ assertThat(moreResults.get(0))
+ .isEqualTo("+I[insert, 2, 1970-01-01T00:00:02Z, 3, purchase]");
}
@Test
@@ -254,7 +255,7 @@ abstract class ChangelogVirtualTableITCase extends
AbstractTestBase {
CLOCK.advanceTime(Duration.ofMillis(100));
writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1", 100L,
"Desc-1")), false);
List<String> insertResult = collectRowsWithTimeout(rowIter, 1, false);
- assertThat(insertResult.get(0)).isEqualTo("+I[+I, 1, Item-1]");
+ assertThat(insertResult.get(0)).isEqualTo("+I[insert, 1, Item-1]");
// Test UPDATE
CLOCK.advanceTime(Duration.ofMillis(100));
@@ -264,15 +265,15 @@ abstract class ChangelogVirtualTableITCase extends
AbstractTestBase {
Arrays.asList(row(1, "Item-1-Updated", 150L,
"Desc-1-Updated")),
false);
List<String> updateResults = collectRowsWithTimeout(rowIter, 2, false);
- assertThat(updateResults.get(0)).isEqualTo("+I[-U, 1, Item-1]");
- assertThat(updateResults.get(1)).isEqualTo("+I[+U, 1,
Item-1-Updated]");
+ assertThat(updateResults.get(0)).isEqualTo("+I[update_before, 1,
Item-1]");
+ assertThat(updateResults.get(1)).isEqualTo("+I[update_after, 1,
Item-1-Updated]");
// Test DELETE
CLOCK.advanceTime(Duration.ofMillis(100));
deleteRows(
conn, tablePath, Arrays.asList(row(1, "Item-1-Updated", 150L,
"Desc-1-Updated")));
List<String> deleteResult = collectRowsWithTimeout(rowIter, 1, true);
- assertThat(deleteResult.get(0)).isEqualTo("+I[-D, 1, Item-1-Updated]");
+ assertThat(deleteResult.get(0)).isEqualTo("+I[delete, 1,
Item-1-Updated]");
}
@Test
@@ -304,20 +305,20 @@ abstract class ChangelogVirtualTableITCase extends
AbstractTestBase {
// With ManualClock and 1 bucket, we can assert exact row values
// Format: +I[_change_type, _log_offset, _commit_timestamp, id, name,
amount]
- assertThat(results.get(0)).isEqualTo("+I[+I, 0, 1970-01-01T00:00:01Z,
1, Item-1, 100]");
- assertThat(results.get(1)).isEqualTo("+I[+I, 1, 1970-01-01T00:00:01Z,
2, Item-2, 200]");
+ assertThat(results.get(0)).isEqualTo("+I[insert, 0,
1970-01-01T00:00:01Z, 1, Item-1, 100]");
+ assertThat(results.get(1)).isEqualTo("+I[insert, 1,
1970-01-01T00:00:01Z, 2, Item-2, 200]");
// Test UPDATE operation with new timestamp
CLOCK.advanceTime(Duration.ofMillis(1000));
writeRows(conn, tablePath, Arrays.asList(row(1, "Item-1-Updated",
150L)), false);
- // Collect update records (should get -U and +U)
+ // Collect update records (should get update_before and update_after)
List<String> updateResults = collectRowsWithTimeout(rowIter, 2, false);
assertThat(updateResults).hasSize(2);
assertThat(updateResults.get(0))
- .isEqualTo("+I[-U, 2, 1970-01-01T00:00:02Z, 1, Item-1, 100]");
+ .isEqualTo("+I[update_before, 2, 1970-01-01T00:00:02Z, 1,
Item-1, 100]");
assertThat(updateResults.get(1))
- .isEqualTo("+I[+U, 3, 1970-01-01T00:00:02Z, 1, Item-1-Updated,
150]");
+ .isEqualTo("+I[update_after, 3, 1970-01-01T00:00:02Z, 1,
Item-1-Updated, 150]");
// Test DELETE operation with new timestamp
CLOCK.advanceTime(Duration.ofMillis(1000));
@@ -327,7 +328,7 @@ abstract class ChangelogVirtualTableITCase extends
AbstractTestBase {
List<String> deleteResult = collectRowsWithTimeout(rowIter, 1, true);
assertThat(deleteResult).hasSize(1);
assertThat(deleteResult.get(0))
- .isEqualTo("+I[-D, 4, 1970-01-01T00:00:03Z, 2, Item-2, 200]");
+ .isEqualTo("+I[delete, 4, 1970-01-01T00:00:03Z, 2, Item-2,
200]");
}
@Test
@@ -361,7 +362,7 @@ abstract class ChangelogVirtualTableITCase extends
AbstractTestBase {
assertThat(earliestResults).hasSize(5);
// All should be INSERT change types
for (String result : earliestResults) {
- assertThat(result).startsWith("+I[+I,");
+ assertThat(result).startsWith("+I[insert,");
}
// 2. Test scan.startup.mode='timestamp' - should read records from
specific timestamp
@@ -374,9 +375,9 @@ abstract class ChangelogVirtualTableITCase extends
AbstractTestBase {
assertThat(timestampResults).hasSize(2);
// Should contain records from batch2 only
assertThat(timestampResults)
- .containsExactlyInAnyOrder(
- "+I[+I, 3, 1970-01-01T00:00:00.200Z, 4, v4]",
- "+I[+I, 4, 1970-01-01T00:00:00.200Z, 5, v5]");
+ .containsExactly(
+ "+I[insert, 3, 1970-01-01T00:00:00.200Z, 4, v4]",
+ "+I[insert, 4, 1970-01-01T00:00:00.200Z, 5, v5]");
}
@Test
@@ -406,15 +407,19 @@ abstract class ChangelogVirtualTableITCase extends
AbstractTestBase {
// Collect initial inserts
List<String> results = collectRowsWithTimeout(rowIter, 3, false);
assertThat(results)
- .containsExactlyInAnyOrder(
- "+I[+I, 1, Item-1, us]", "+I[+I, 2, Item-2, us]",
"+I[+I, 3, Item-3, eu]");
+ .containsExactly(
+ "+I[insert, 1, Item-1, us]",
+ "+I[insert, 2, Item-2, us]",
+ "+I[insert, 3, Item-3, eu]");
// Update a record in a specific partition
CLOCK.advanceTime(Duration.ofMillis(100));
tEnv.executeSql("INSERT INTO partitioned_test VALUES (1,
'Item-1-Updated', 'us')").await();
List<String> updateResults = collectRowsWithTimeout(rowIter, 2, false);
assertThat(updateResults)
- .containsExactly("+I[-U, 1, Item-1, us]", "+I[+U, 1,
Item-1-Updated, us]");
+ .containsExactly(
+ "+I[update_before, 1, Item-1, us]",
+ "+I[update_after, 1, Item-1-Updated, us]");
rowIter.close();
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java
new file mode 100644
index 000000000..a09dd7c41
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/BinlogRowConverterTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.GenericRecord;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.row.indexed.IndexedRowWriter;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Unit test for {@link BinlogRowConverter}. */
+class BinlogRowConverterTest {
+
+ private RowType testRowType;
+ private BinlogRowConverter converter;
+
+ @BeforeEach
+ void setUp() {
+ // Create a simple test table schema: (id INT, name STRING, amount
BIGINT)
+ testRowType =
+ RowType.builder()
+ .field("id", DataTypes.INT())
+ .field("name", DataTypes.STRING())
+ .field("amount", DataTypes.BIGINT())
+ .build();
+
+ converter = new BinlogRowConverter(testRowType);
+ }
+
+ @Test
+ void testConvertInsertRecord() throws Exception {
+ LogRecord record = createLogRecord(ChangeType.INSERT, 100L, 1000L, 1,
"Alice", 5000L);
+
+ RowData result = converter.convert(record);
+
+ // Verify row kind is always INSERT for virtual tables
+ assertThat(result).isNotNull();
+ assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
+
+ // Verify metadata columns
+
assertThat(result.getString(0)).isEqualTo(StringData.fromString("insert"));
+ assertThat(result.getLong(1)).isEqualTo(100L); // log offset
+ assertThat(result.getTimestamp(2,
3)).isEqualTo(TimestampData.fromEpochMillis(1000L));
+
+ // Verify before is null for INSERT
+ assertThat(result.isNullAt(3)).isTrue();
+
+ // Verify after contains the row data
+ RowData afterRow = result.getRow(4, 3);
+ assertThat(afterRow).isNotNull();
+ assertThat(afterRow.getInt(0)).isEqualTo(1); // id
+ assertThat(afterRow.getString(1).toString()).isEqualTo("Alice"); //
name
+ assertThat(afterRow.getLong(2)).isEqualTo(5000L); // amount
+ }
+
+ @Test
+ void testConvertUpdateMerge() throws Exception {
+ // Send -U (UPDATE_BEFORE) - should return null (buffered)
+ LogRecord beforeRecord =
+ createLogRecord(ChangeType.UPDATE_BEFORE, 200L, 2000L, 2,
"Bob", 3000L);
+ RowData beforeResult = converter.convert(beforeRecord);
+ assertThat(beforeResult).isNull();
+
+ // Send +U (UPDATE_AFTER) - should return merged row
+ LogRecord afterRecord =
+ createLogRecord(ChangeType.UPDATE_AFTER, 201L, 2000L, 2,
"Bob-Updated", 4000L);
+ RowData result = converter.convert(afterRecord);
+
+ assertThat(result).isNotNull();
+ assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
+
+ // Verify metadata columns
+
assertThat(result.getString(0)).isEqualTo(StringData.fromString("update"));
+ // Offset and timestamp should be from the -U record (first entry of
update pair)
+ assertThat(result.getLong(1)).isEqualTo(200L);
+ assertThat(result.getTimestamp(2,
3)).isEqualTo(TimestampData.fromEpochMillis(2000L));
+
+ // Verify before contains old data
+ RowData beforeRow = result.getRow(3, 3);
+ assertThat(beforeRow).isNotNull();
+ assertThat(beforeRow.getInt(0)).isEqualTo(2);
+ assertThat(beforeRow.getString(1).toString()).isEqualTo("Bob");
+ assertThat(beforeRow.getLong(2)).isEqualTo(3000L);
+
+ // Verify after contains new data
+ RowData afterRow = result.getRow(4, 3);
+ assertThat(afterRow).isNotNull();
+ assertThat(afterRow.getInt(0)).isEqualTo(2);
+ assertThat(afterRow.getString(1).toString()).isEqualTo("Bob-Updated");
+ assertThat(afterRow.getLong(2)).isEqualTo(4000L);
+ }
+
+ @Test
+ void testConvertDeleteRecord() throws Exception {
+ LogRecord record = createLogRecord(ChangeType.DELETE, 300L, 3000L, 3,
"Charlie", 1000L);
+
+ RowData result = converter.convert(record);
+
+ assertThat(result).isNotNull();
+ assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
+
+ // Verify metadata columns
+
assertThat(result.getString(0)).isEqualTo(StringData.fromString("delete"));
+ assertThat(result.getLong(1)).isEqualTo(300L);
+ assertThat(result.getTimestamp(2,
3)).isEqualTo(TimestampData.fromEpochMillis(3000L));
+
+ // Verify before contains the deleted row data
+ RowData beforeRow = result.getRow(3, 3);
+ assertThat(beforeRow).isNotNull();
+ assertThat(beforeRow.getInt(0)).isEqualTo(3);
+ assertThat(beforeRow.getString(1).toString()).isEqualTo("Charlie");
+ assertThat(beforeRow.getLong(2)).isEqualTo(1000L);
+
+ // Verify after is null for DELETE
+ assertThat(result.isNullAt(4)).isTrue();
+ }
+
+ @Test
+ void testUpdateBeforeReturnsNull() throws Exception {
+ LogRecord record =
+ createLogRecord(ChangeType.UPDATE_BEFORE, 400L, 4000L, 4,
"Diana", 2000L);
+
+ // UPDATE_BEFORE should return null (buffered for merging)
+ RowData result = converter.convert(record);
+ assertThat(result).isNull();
+ }
+
+ @Test
+ void testUpdateAfterWithoutBeforeThrows() throws Exception {
+ // Sending +U without a preceding -U should throw
+ LogRecord record = createLogRecord(ChangeType.UPDATE_AFTER, 500L,
5000L, 5, "Eve", 6000L);
+
+ assertThatThrownBy(() -> converter.convert(record))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("UPDATE_AFTER (+U) without a preceding
UPDATE_BEFORE (-U)");
+ }
+
+ @Test
+ void testAppendOnlyUnsupported() throws Exception {
+ LogRecord record = createLogRecord(ChangeType.APPEND_ONLY, 600L,
6000L, 6, "Frank", 7000L);
+
+ assertThatThrownBy(() -> converter.convert(record))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("$binlog virtual table does not support
change type");
+ }
+
+ @Test
+ void testProducedTypeHasNestedRowColumns() {
+ org.apache.flink.table.types.logical.RowType producedType =
converter.getProducedType();
+
+ // Should have 5 columns: _change_type, _log_offset,
_commit_timestamp, before, after
+ assertThat(producedType.getFieldCount()).isEqualTo(5);
+
+ // Check column names
+ assertThat(producedType.getFieldNames())
+ .containsExactly(
+ "_change_type", "_log_offset", "_commit_timestamp",
"before", "after");
+
+ // Check metadata column types
+ assertThat(producedType.getTypeAt(0))
+
.isInstanceOf(org.apache.flink.table.types.logical.VarCharType.class);
+ assertThat(producedType.getTypeAt(1))
+
.isInstanceOf(org.apache.flink.table.types.logical.BigIntType.class);
+ assertThat(producedType.getTypeAt(2))
+
.isInstanceOf(org.apache.flink.table.types.logical.LocalZonedTimestampType.class);
+
+ // Check before and after are ROW types
+ assertThat(producedType.getTypeAt(3))
+
.isInstanceOf(org.apache.flink.table.types.logical.RowType.class);
+ assertThat(producedType.getTypeAt(4))
+
.isInstanceOf(org.apache.flink.table.types.logical.RowType.class);
+
+ // Check nested ROW has the original columns
+ org.apache.flink.table.types.logical.RowType beforeType =
+ (org.apache.flink.table.types.logical.RowType)
producedType.getTypeAt(3);
+ assertThat(beforeType.getFieldNames()).containsExactly("id", "name",
"amount");
+
+ // before/after ROW types should be nullable
+ assertThat(producedType.getTypeAt(3).isNullable()).isTrue();
+ assertThat(producedType.getTypeAt(4).isNullable()).isTrue();
+ }
+
+ @Test
+ void testMultipleUpdatesInSequence() throws Exception {
+ // First update pair
+ converter.convert(createLogRecord(ChangeType.UPDATE_BEFORE, 10L,
1000L, 1, "A", 100L));
+ RowData result1 =
+ converter.convert(
+ createLogRecord(ChangeType.UPDATE_AFTER, 11L, 1000L,
1, "B", 200L));
+ assertThat(result1).isNotNull();
+
assertThat(result1.getString(0)).isEqualTo(StringData.fromString("update"));
+
+ // Second update pair (converter state should be clean after first
merge)
+ converter.convert(createLogRecord(ChangeType.UPDATE_BEFORE, 20L,
2000L, 1, "B", 200L));
+ RowData result2 =
+ converter.convert(
+ createLogRecord(ChangeType.UPDATE_AFTER, 21L, 2000L,
1, "C", 300L));
+ assertThat(result2).isNotNull();
+
assertThat(result2.getString(0)).isEqualTo(StringData.fromString("update"));
+ assertThat(result2.getLong(1)).isEqualTo(20L); // offset from second -U
+ }
+
+ private LogRecord createLogRecord(
+ ChangeType changeType, long offset, long timestamp, int id, String
name, long amount)
+ throws Exception {
+ // Create an IndexedRow with test data
+ IndexedRow row = new IndexedRow(testRowType.getChildren().toArray(new
DataType[0]));
+ try (IndexedRowWriter writer =
+ new IndexedRowWriter(testRowType.getChildren().toArray(new
DataType[0]))) {
+ writer.writeInt(id);
+ writer.writeString(BinaryString.fromString(name));
+ writer.writeLong(amount);
+ writer.complete();
+
+ row.pointTo(writer.segment(), 0, writer.position());
+
+ return new GenericRecord(offset, timestamp, changeType, row);
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java
index c569ba86d..55f83a514 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/ChangelogRowConverterTest.java
@@ -65,7 +65,7 @@ class ChangelogRowConverterTest {
assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
// Verify metadata columns
- assertThat(result.getString(0)).isEqualTo(StringData.fromString("+I"));
+
assertThat(result.getString(0)).isEqualTo(StringData.fromString("insert"));
assertThat(result.getLong(1)).isEqualTo(100L); // log offset
assertThat(result.getTimestamp(2, 3)).isNotNull(); // commit timestamp
@@ -88,7 +88,7 @@ class ChangelogRowConverterTest {
assertThat(result.getRowKind()).isEqualTo(RowKind.INSERT);
// Verify change type metadata
- assertThat(result.getString(0)).isEqualTo(StringData.fromString("-U"));
+
assertThat(result.getString(0)).isEqualTo(StringData.fromString("update_before"));
assertThat(result.getLong(1)).isEqualTo(200L);
// Verify physical columns
@@ -103,7 +103,7 @@ class ChangelogRowConverterTest {
RowData result = converter.convert(record);
- assertThat(result.getString(0)).isEqualTo(StringData.fromString("+U"));
+
assertThat(result.getString(0)).isEqualTo(StringData.fromString("update_after"));
assertThat(result.getLong(1)).isEqualTo(201L);
assertThat(result.getInt(3)).isEqualTo(2);
assertThat(result.getString(4).toString()).isEqualTo("Bob");
@@ -116,7 +116,7 @@ class ChangelogRowConverterTest {
RowData result = converter.convert(record);
- assertThat(result.getString(0)).isEqualTo(StringData.fromString("-D"));
+
assertThat(result.getString(0)).isEqualTo(StringData.fromString("delete"));
assertThat(result.getLong(1)).isEqualTo(300L);
assertThat(result.getInt(3)).isEqualTo(3);
assertThat(result.getString(4).toString()).isEqualTo("Charlie");
@@ -151,7 +151,7 @@ class ChangelogRowConverterTest {
converter
.convert(createLogRecord(ChangeType.INSERT,
1L, 1, "Test", 100L))
.getString(0))
- .isEqualTo(StringData.fromString("+I"));
+ .isEqualTo(StringData.fromString("insert"));
assertThat(
converter
@@ -159,7 +159,7 @@ class ChangelogRowConverterTest {
createLogRecord(
ChangeType.UPDATE_BEFORE, 2L,
1, "Test", 100L))
.getString(0))
- .isEqualTo(StringData.fromString("-U"));
+ .isEqualTo(StringData.fromString("update_before"));
assertThat(
converter
@@ -167,13 +167,13 @@ class ChangelogRowConverterTest {
createLogRecord(
ChangeType.UPDATE_AFTER, 3L,
1, "Test", 100L))
.getString(0))
- .isEqualTo(StringData.fromString("+U"));
+ .isEqualTo(StringData.fromString("update_after"));
assertThat(
converter
.convert(createLogRecord(ChangeType.DELETE,
4L, 1, "Test", 100L))
.getString(0))
- .isEqualTo(StringData.fromString("-D"));
+ .isEqualTo(StringData.fromString("delete"));
// For log tables (append-only)
assertThat(
@@ -182,7 +182,7 @@ class ChangelogRowConverterTest {
createLogRecord(
ChangeType.APPEND_ONLY, 5L, 1,
"Test", 100L))
.getString(0))
- .isEqualTo(StringData.fromString("+A"));
+ .isEqualTo(StringData.fromString("insert"));
}
private LogRecord createLogRecord(