spena commented on code in PR #25054:
URL: https://github.com/apache/flink/pull/25054#discussion_r1670627857
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java:
##########
@@ -312,113 +302,36 @@ private void appendDerivedPrimaryKey(@Nullable
SqlTableConstraint derivedPrimary
"The base table already has a primary key. You might "
+ "want to specify EXCLUDING CONSTRAINTS.");
} else if (derivedPrimaryKey != null) {
- List<String> primaryKeyColumns = new ArrayList<>();
- for (SqlNode primaryKeyNode : derivedPrimaryKey.getColumns()) {
- String primaryKey = ((SqlIdentifier)
primaryKeyNode).getSimple();
- if (!columns.containsKey(primaryKey)) {
- throw new ValidationException(
- String.format(
- "Primary key column '%s' is not
defined in the schema at %s",
- primaryKey,
primaryKeyNode.getParserPosition()));
- }
- if (!(columns.get(primaryKey) instanceof
UnresolvedPhysicalColumn)) {
- throw new ValidationException(
- String.format(
- "Could not create a PRIMARY KEY with
column '%s' at %s.\n"
- + "A PRIMARY KEY constraint
must be declared on physical columns.",
- primaryKey,
primaryKeyNode.getParserPosition()));
- }
- primaryKeyColumns.add(primaryKey);
- }
- primaryKey =
- new UnresolvedPrimaryKey(
- derivedPrimaryKey
- .getConstraintName()
- .orElseGet(
- () ->
-
primaryKeyColumns.stream()
- .collect(
-
Collectors.joining(
-
"_", "PK_", ""))),
- primaryKeyColumns);
+ setPrimaryKey(derivedPrimaryKey);
}
}
private void appendDerivedWatermarks(
Map<FeatureOption, MergingStrategy> mergingStrategies,
List<SqlWatermark> derivedWatermarkSpecs) {
- for (SqlWatermark derivedWatermarkSpec : derivedWatermarkSpecs) {
- SqlIdentifier eventTimeColumnName =
derivedWatermarkSpec.getEventTimeColumnName();
-
- HashMap<String, RelDataType> nameToTypeMap = new
LinkedHashMap<>();
- nameToTypeMap.putAll(physicalFieldNamesToTypes);
- nameToTypeMap.putAll(metadataFieldNamesToTypes);
- nameToTypeMap.putAll(computedFieldNamesToTypes);
-
- verifyRowtimeAttribute(mergingStrategies, eventTimeColumnName,
nameToTypeMap);
- String rowtimeAttribute = eventTimeColumnName.toString();
-
- SqlNode expression =
derivedWatermarkSpec.getWatermarkStrategy();
-
- // this will validate and expand function identifiers.
- SqlNode validated =
-
sqlValidator.validateParameterizedExpression(expression, nameToTypeMap);
-
- watermarkSpecs.put(
- rowtimeAttribute,
- new UnresolvedWatermarkSpec(
- rowtimeAttribute,
- new
SqlCallExpression(escapeExpressions.apply(validated))));
- }
- }
-
- private void verifyRowtimeAttribute(
- Map<FeatureOption, MergingStrategy> mergingStrategies,
- SqlIdentifier eventTimeColumnName,
- Map<String, RelDataType> allFieldsTypes) {
- String fullRowtimeExpression = eventTimeColumnName.toString();
- boolean specAlreadyExists =
watermarkSpecs.containsKey(fullRowtimeExpression);
-
- if (specAlreadyExists
- && mergingStrategies.get(FeatureOption.WATERMARKS)
- != MergingStrategy.OVERWRITING) {
- throw new ValidationException(
- String.format(
- "There already exists a watermark spec for
column '%s' in the base table. You "
- + "might want to specify EXCLUDING
WATERMARKS or OVERWRITING WATERMARKS.",
- fullRowtimeExpression));
- }
-
- List<String> components = eventTimeColumnName.names;
- if (!allFieldsTypes.containsKey(components.get(0))) {
- throw new ValidationException(
- String.format(
- "The rowtime attribute field '%s' is not
defined in the table schema, at %s\n"
- + "Available fields: [%s]",
- fullRowtimeExpression,
- eventTimeColumnName.getParserPosition(),
- allFieldsTypes.keySet().stream()
- .collect(Collectors.joining("', '",
"'", "'"))));
- }
-
- if (components.size() > 1) {
- RelDataType componentType =
allFieldsTypes.get(components.get(0));
- for (int i = 1; i < components.size(); i++) {
- RelDataTypeField field =
componentType.getField(components.get(i), true, false);
- if (field == null) {
+ if (mergingStrategies.get(SqlTableLike.FeatureOption.WATERMARKS)
Review Comment:
Done
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala:
##########
@@ -399,6 +399,47 @@ class TableSinkITCase(mode: StateBackendMode) extends
StreamingWithStateTestBase
assertThat(actualUseStatement.sorted).isEqualTo(expected.sorted)
}
+ @TestTemplate
+ def testCreateTableAsSelectWithPrimaryKey(): Unit = {
Review Comment:
Removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]