MehulBatra commented on code in PR #2347:
URL: https://github.com/apache/fluss/pull/2347#discussion_r2713099884


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java:
##########
@@ -262,4 +268,72 @@ private LakeTableFactory mayInitLakeTableFactory() {
         }
         return lakeTableFactory;
     }
+
+    /** Creates a ChangelogFlinkTableSource for $changelog virtual tables. */
+    private DynamicTableSource createChangelogTableSource(
+            Context context, ObjectIdentifier tableIdentifier, String 
tableName) {
+        // Extract the base table name by removing the $changelog suffix
+        String baseTableName =
+                tableName.substring(
+                        0, tableName.length() - 
FlinkCatalog.CHANGELOG_TABLE_SUFFIX.length());
+
+        boolean isStreamingMode =
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING;
+
+        // tableOutputType includes metadata columns: [_change_type, 
_log_offset, _commit_timestamp,
+        // data_cols...]
+        RowType tableOutputType = (RowType) 
context.getPhysicalRowDataType().getLogicalType();
+
+        // Extract data columns type (skip the 3 metadata columns) for index 
calculations
+        int numMetadataColumns = 3;
+        List<RowType.RowField> dataFields =
+                tableOutputType
+                        .getFields()
+                        .subList(numMetadataColumns, 
tableOutputType.getFieldCount());
+        RowType dataColumnsType = new RowType(new ArrayList<>(dataFields));
+
+        Map<String, String> catalogTableOptions = 
context.getCatalogTable().getOptions();
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        final ReadableConfig tableOptions = helper.getOptions();

Review Comment:
   Agreed, will create a helper method to share between dynamictablesource and 
changelogtablesource, later be used by binlog aswell!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to