snuyanzin commented on code in PR #27488:
URL: https://github.com/apache/flink/pull/27488#discussion_r2740341193
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java:
##########
@@ -77,24 +155,343 @@ public String asSummaryString() {
}
private static String toString(TableChange tableChange) {
- if (tableChange instanceof TableChange.ModifyRefreshStatus) {
- TableChange.ModifyRefreshStatus refreshStatus =
- (TableChange.ModifyRefreshStatus) tableChange;
+ if (tableChange instanceof ModifyRefreshStatus) {
+ ModifyRefreshStatus refreshStatus = (ModifyRefreshStatus)
tableChange;
return String.format(
" MODIFY REFRESH STATUS TO '%s'",
refreshStatus.getRefreshStatus());
- } else if (tableChange instanceof TableChange.ModifyRefreshHandler) {
- TableChange.ModifyRefreshHandler refreshHandler =
- (TableChange.ModifyRefreshHandler) tableChange;
+ } else if (tableChange instanceof ModifyRefreshHandler) {
+ ModifyRefreshHandler refreshHandler = (ModifyRefreshHandler)
tableChange;
return String.format(
" MODIFY REFRESH HANDLER DESCRIPTION TO '%s'",
refreshHandler.getRefreshHandlerDesc());
- } else if (tableChange instanceof TableChange.ModifyDefinitionQuery) {
- TableChange.ModifyDefinitionQuery definitionQuery =
- (TableChange.ModifyDefinitionQuery) tableChange;
+ } else if (tableChange instanceof ModifyDefinitionQuery) {
+ ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery)
tableChange;
return String.format(
" MODIFY DEFINITION QUERY TO '%s'",
definitionQuery.getDefinitionQuery());
} else {
return AlterTableChangeOperation.toString(tableChange);
}
}
+
+ private static class MTContext {
+ private boolean isQueryChange;
+ private @Nullable TableDistribution distribution;
+ private RefreshStatus refreshStatus;
+ private @Nullable String refreshHandlerDesc;
+ private byte[] refreshHandlerBytes;
+ private List<UnresolvedColumn> columns;
+ private List<UnresolvedWatermarkSpec> watermarkSpecs;
+ private String primaryKeyName = null;
+ private List<String> primaryKeyColumns = null;
+ private int droppedPersistedCnt = 0;
+ private String originalQuery;
+ private String expandedQuery;
+ private CatalogMaterializedTable oldTable;
+
+ public MTContext(CatalogMaterializedTable oldTable) {
+ this.distribution = oldTable.getDistribution().orElse(null);
+ this.refreshStatus = oldTable.getRefreshStatus();
+ this.refreshHandlerDesc =
oldTable.getRefreshHandlerDescription().orElse(null);
+ this.refreshHandlerBytes = oldTable.getSerializedRefreshHandler();
+ this.watermarkSpecs =
oldTable.getUnresolvedSchema().getWatermarkSpecs();
+ this.columns = new
LinkedList<>(oldTable.getUnresolvedSchema().getColumns());
+ Schema.UnresolvedPrimaryKey primaryKey =
+
oldTable.getUnresolvedSchema().getPrimaryKey().orElse(null);
+ if (primaryKey != null) {
+ this.primaryKeyName = primaryKey.getConstraintName();
+ this.primaryKeyColumns = primaryKey.getColumnNames();
+ }
+ originalQuery = oldTable.getOriginalQuery();
+ expandedQuery = oldTable.getExpandedQuery();
+ this.oldTable = oldTable;
+ }
+
+ private void setColumnAtPosition(UnresolvedColumn column,
ColumnPosition position) {
+ if (position == null) {
+ columns.add(column);
+ } else if (position == ColumnPosition.first()) {
+ columns.add(0, column);
+ } else {
+ String after = ((After) position).column();
+ int index = getColumnIndex(after);
+
+ columns.add(index + 1, column);
+ }
+ }
+
+ private int getColumnIndex(String name) {
+ for (int i = 0; i < columns.size(); i++) {
+ if (Objects.equals(name, columns.get(i).getName())) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private void applyTableChanges(List<TableChange> tableChanges) {
+ isQueryChange = tableChanges.stream().anyMatch(t -> t instanceof
ModifyDefinitionQuery);
Review Comment:
de[ending on whether it is a query change or not the validation is slightly
different
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java:
##########
@@ -77,24 +155,343 @@ public String asSummaryString() {
}
private static String toString(TableChange tableChange) {
- if (tableChange instanceof TableChange.ModifyRefreshStatus) {
- TableChange.ModifyRefreshStatus refreshStatus =
- (TableChange.ModifyRefreshStatus) tableChange;
+ if (tableChange instanceof ModifyRefreshStatus) {
+ ModifyRefreshStatus refreshStatus = (ModifyRefreshStatus)
tableChange;
return String.format(
" MODIFY REFRESH STATUS TO '%s'",
refreshStatus.getRefreshStatus());
- } else if (tableChange instanceof TableChange.ModifyRefreshHandler) {
- TableChange.ModifyRefreshHandler refreshHandler =
- (TableChange.ModifyRefreshHandler) tableChange;
+ } else if (tableChange instanceof ModifyRefreshHandler) {
+ ModifyRefreshHandler refreshHandler = (ModifyRefreshHandler)
tableChange;
return String.format(
" MODIFY REFRESH HANDLER DESCRIPTION TO '%s'",
refreshHandler.getRefreshHandlerDesc());
- } else if (tableChange instanceof TableChange.ModifyDefinitionQuery) {
- TableChange.ModifyDefinitionQuery definitionQuery =
- (TableChange.ModifyDefinitionQuery) tableChange;
+ } else if (tableChange instanceof ModifyDefinitionQuery) {
+ ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery)
tableChange;
return String.format(
" MODIFY DEFINITION QUERY TO '%s'",
definitionQuery.getDefinitionQuery());
} else {
return AlterTableChangeOperation.toString(tableChange);
}
}
+
+ private static class MTContext {
+ private boolean isQueryChange;
+ private @Nullable TableDistribution distribution;
+ private RefreshStatus refreshStatus;
+ private @Nullable String refreshHandlerDesc;
+ private byte[] refreshHandlerBytes;
+ private List<UnresolvedColumn> columns;
+ private List<UnresolvedWatermarkSpec> watermarkSpecs;
+ private String primaryKeyName = null;
+ private List<String> primaryKeyColumns = null;
+ private int droppedPersistedCnt = 0;
+ private String originalQuery;
+ private String expandedQuery;
+ private CatalogMaterializedTable oldTable;
+
+ public MTContext(CatalogMaterializedTable oldTable) {
+ this.distribution = oldTable.getDistribution().orElse(null);
+ this.refreshStatus = oldTable.getRefreshStatus();
+ this.refreshHandlerDesc =
oldTable.getRefreshHandlerDescription().orElse(null);
+ this.refreshHandlerBytes = oldTable.getSerializedRefreshHandler();
+ this.watermarkSpecs =
oldTable.getUnresolvedSchema().getWatermarkSpecs();
+ this.columns = new
LinkedList<>(oldTable.getUnresolvedSchema().getColumns());
+ Schema.UnresolvedPrimaryKey primaryKey =
+
oldTable.getUnresolvedSchema().getPrimaryKey().orElse(null);
+ if (primaryKey != null) {
+ this.primaryKeyName = primaryKey.getConstraintName();
+ this.primaryKeyColumns = primaryKey.getColumnNames();
+ }
+ originalQuery = oldTable.getOriginalQuery();
+ expandedQuery = oldTable.getExpandedQuery();
+ this.oldTable = oldTable;
+ }
+
+ private void setColumnAtPosition(UnresolvedColumn column,
ColumnPosition position) {
+ if (position == null) {
+ columns.add(column);
+ } else if (position == ColumnPosition.first()) {
+ columns.add(0, column);
+ } else {
+ String after = ((After) position).column();
+ int index = getColumnIndex(after);
+
+ columns.add(index + 1, column);
+ }
+ }
+
+ private int getColumnIndex(String name) {
+ for (int i = 0; i < columns.size(); i++) {
+ if (Objects.equals(name, columns.get(i).getName())) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private void applyTableChanges(List<TableChange> tableChanges) {
+ isQueryChange = tableChanges.stream().anyMatch(t -> t instanceof
ModifyDefinitionQuery);
Review Comment:
depending on whether it is a query change or not the validation is slightly
different
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]