MehulBatra commented on code in PR #2347:
URL: https://github.com/apache/fluss/pull/2347#discussion_r2712774474
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java:
##########
@@ -862,4 +876,108 @@ private static boolean isPrefixList(List<String>
fullList, List<String> prefixLi
}
return true;
}
+
+ /**
+ * Creates a virtual $changelog table by modifying the base table's to
include metadata columns.
+ */
+ private CatalogBaseTable getVirtualChangelogTable(ObjectPath objectPath)
+ throws TableNotExistException, CatalogException {
+ // Extract the base table name (remove $changelog suffix)
+ String virtualTableName = objectPath.getObjectName();
+ String baseTableName =
+ virtualTableName.substring(
+ 0, virtualTableName.length() -
CHANGELOG_TABLE_SUFFIX.length());
+
+ // Get the base table
+ ObjectPath baseObjectPath = new
ObjectPath(objectPath.getDatabaseName(), baseTableName);
+ TablePath baseTablePath = toTablePath(baseObjectPath);
+
+ try {
+ // Retrieve base table info
+ TableInfo tableInfo = admin.getTableInfo(baseTablePath).get();
+
+ // Validate that this is a primary key table
+ if (tableInfo.getPhysicalPrimaryKeys().isEmpty()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Virtual $changelog tables are only supported
for primary key tables. "
+ + "Table %s does not have a primary
key.",
+ baseTablePath));
+ }
+
+ // Convert to Flink table
+ CatalogBaseTable catalogBaseTable =
FlinkConversions.toFlinkTable(tableInfo);
+
+ if (!(catalogBaseTable instanceof CatalogTable)) {
+ throw new UnsupportedOperationException(
+ "Virtual $changelog tables are only supported for
regular tables");
+ }
+
+ CatalogTable baseTable = (CatalogTable) catalogBaseTable;
+
+ // Build the changelog schema by adding metadata columns
+ Schema originalSchema = baseTable.getUnresolvedSchema();
+ Schema changelogSchema = buildChangelogSchema(originalSchema);
+
+ // Copy options from base table
+ Map<String, String> newOptions = new
HashMap<>(baseTable.getOptions());
+ newOptions.put(BOOTSTRAP_SERVERS.key(), bootstrapServers);
+ newOptions.putAll(securityConfigs);
+
+ // Create a new CatalogTable with the modified schema
+ return CatalogTable.of(
+ changelogSchema,
+ baseTable.getComment(),
+ baseTable.getPartitionKeys(),
+ newOptions);
+
+ } catch (Exception e) {
+ Throwable t = ExceptionUtils.stripExecutionException(e);
+ if (isTableNotExist(t)) {
+ throw new TableNotExistException(getName(), baseObjectPath);
+ } else {
+ throw new CatalogException(
+ String.format(
+ "Failed to get virtual changelog table %s in
%s",
+ objectPath, getName()),
+ t);
+ }
+ }
+ }
+
+ private Schema buildChangelogSchema(Schema originalSchema) {
+ Schema.Builder builder = Schema.newBuilder();
+
+ // Add metadata columns first
+ builder.column("_change_type",
org.apache.flink.table.api.DataTypes.STRING().notNull());
+ builder.column("_log_offset",
org.apache.flink.table.api.DataTypes.BIGINT().notNull());
+ builder.column(
+ "_commit_timestamp",
org.apache.flink.table.api.DataTypes.TIMESTAMP(3).notNull());
Review Comment:
I am going ahead with TIMESTAMP_LTZ without precision
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]