Abacn commented on code in PR #36571:
URL: https://github.com/apache/beam/pull/36571#discussion_r2754876731


##########
sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl:
##########
@@ -537,6 +599,61 @@ SqlCreate SqlCreateExternalTable(Span s, boolean replace) :
     }
 }
 
+/**
+ * ALTER TABLE table_name

Review Comment:
   Is there a reference spec/syntax we intended to align with these DDL 
statements? Like  https://iceberg.apache.org/docs/latest/spark-ddl/ uses Spark 
DDL. If there is such a reference spec, considering adding a link and note that 
future addition should align with it



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterCatalog.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import static 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogManagerSchema;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogSchema;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlAlter;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperator;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlWriter;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class SqlAlterCatalog extends SqlAlter implements 
BeamSqlParser.ExecutableStatement {
+  private static final SqlOperator OPERATOR =
+      new SqlSpecialOperator("ALTER CATALOG", SqlKind.OTHER_DDL);
+  private final SqlIdentifier name;
+  private final @Nullable SqlNodeList setProps;
+  private final @Nullable SqlNodeList resetProps;
+
+  public SqlAlterCatalog(
+      SqlParserPos pos,
+      @Nullable String scope,
+      SqlNode name,
+      @Nullable SqlNodeList setProps,
+      @Nullable SqlNodeList resetProps) {
+    super(pos, scope);
+    this.name = SqlDdlNodes.getIdentifier(name, pos);
+    this.setProps = setProps;
+    this.resetProps = resetProps;
+  }
+
+  @Override
+  public void execute(CalcitePrepare.Context context) {
+    final Pair<CalciteSchema, String> pair = SqlDdlNodes.schema(context, true, 
name);
+    Schema schema = pair.left.schema;
+
+    if (!(schema instanceof CatalogManagerSchema)) {
+      throw SqlUtil.newContextException(
+          name.getParserPosition(),
+          RESOURCE.internal(
+              "Attempting to alter catalog '"
+                  + SqlDdlNodes.name(name)
+                  + "' with unexpected Calcite Schema of type "
+                  + schema.getClass()));
+    }
+
+    CatalogSchema catalogSchema =
+        ((CatalogManagerSchema) 
schema).getCatalogSchema(SqlDdlNodes.getString(name));
+
+    Map<String, String> setPropsMap = SqlDdlNodes.getStringMap(setProps);
+    Collection<String> resetPropsList = SqlDdlNodes.getStringList(resetProps);
+
+    ImmutableList.Builder<String> overlappingPropsBuilder = 
ImmutableList.builder();
+    
resetPropsList.stream().filter(setPropsMap::containsKey).forEach(overlappingPropsBuilder::add);
+    List<String> overlappingProps = overlappingPropsBuilder.build();
+    checkState(
+        overlappingProps.isEmpty(),
+        "Invalid %s call: Found overlapping properties between SET and RESET: 
%s.",
+        OPERATOR,
+        overlappingProps);
+
+    catalogSchema.updateProperties(setPropsMap, resetPropsList);
+  }
+
+  @Override
+  public void unparseAlterOperation(SqlWriter writer, int left, int right) {

Review Comment:
   it contains some complex logic assembling a statement string. Consider unit 
testing it (same for SqlAlterTable.unparseAlterOperation)



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterTable.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import static 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogManagerSchema;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogSchema;
+import org.apache.beam.sdk.extensions.sql.impl.TableName;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlAlter;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperator;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlWriter;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class SqlAlterTable extends SqlAlter implements 
BeamSqlParser.ExecutableStatement {
+  private static final SqlOperator OPERATOR =
+      new SqlSpecialOperator("ALTER TABLE", SqlKind.ALTER_TABLE);
+  private final SqlIdentifier name;
+  private final @Nullable List<Field> columnsToAdd;
+  private final @Nullable SqlNodeList columnsToDrop;
+  private final @Nullable SqlNodeList partitionsToAdd;
+  private final @Nullable SqlNodeList partitionsToDrop;
+  private final @Nullable SqlNodeList setProps;
+  private final @Nullable SqlNodeList resetProps;
+
+  public SqlAlterTable(

Review Comment:
   same here, constructor of SqlAlterTable has even more parameters



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterCatalog.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import static 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogManagerSchema;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogSchema;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlAlter;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperator;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlWriter;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class SqlAlterCatalog extends SqlAlter implements 
BeamSqlParser.ExecutableStatement {
+  private static final SqlOperator OPERATOR =
+      new SqlSpecialOperator("ALTER CATALOG", SqlKind.OTHER_DDL);
+  private final SqlIdentifier name;
+  private final @Nullable SqlNodeList setProps;
+  private final @Nullable SqlNodeList resetProps;
+
+  public SqlAlterCatalog(

Review Comment:
   Consider adding a javadoc or comments? Not obvious what these parameters 
`setProps` and `resetProps` doing?



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/AlterTestTableOps.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.test;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.TableWithRows;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+
+public class AlterTestTableOps implements AlterTableOps {
+  private final TableWithRows tableWithRows;
+
+  AlterTestTableOps(TableWithRows tableWithRows) {
+    this.tableWithRows = tableWithRows;
+  }
+
+  @Override
+  public void updateTableProperties(Map<String, String> setProps, List<String> 
resetProps) {
+    ObjectNode props = tableWithRows.getTable().getProperties();
+    resetProps.forEach(props::remove);
+    setProps.forEach(props::put);
+    
tableWithRows.setTable(tableWithRows.getTable().toBuilder().properties(props).build());
+  }
+
+  @Override
+  public void updateSchema(List<Field> columnsToAdd, Collection<String> 
columnsToDrop) {
+    if (!columnsToAdd.isEmpty() && !tableWithRows.getRows().isEmpty()) {
+      ImmutableList.Builder<String> requiredFields = ImmutableList.builder();
+      for (Field f : columnsToAdd) {
+        if (!f.getType().getNullable()) {
+          requiredFields.add(f.getName());
+        }
+      }
+      Preconditions.checkArgument(
+          requiredFields.build().isEmpty(),
+          "Cannot add required fields %s because table '%s' already contains 
rows.",
+          requiredFields.build(),
+          tableWithRows.getTable().getName());
+    }
+
+    // update the schema
+    List<Field> schemaFields = 
tableWithRows.getTable().getSchema().getFields();
+    ImmutableList.Builder<Field> newSchemaFields = ImmutableList.builder();
+    // remove dropped fields
+    schemaFields.stream()
+        .filter(f -> !columnsToDrop.contains(f.getName()))
+        .forEach(newSchemaFields::add);
+    // add new fields
+    newSchemaFields.addAll(columnsToAdd);
+    Schema newSchema = Schema.of(newSchemaFields.build().toArray(new 
Field[0]));
+    
tableWithRows.setTable(tableWithRows.getTable().toBuilder().schema(newSchema).build());
+
+    // update existing rows
+    List<Row> rows = tableWithRows.getRows();
+    List<Row> newRows = new CopyOnWriteArrayList<>();

Review Comment:
   It creates a new CopyOnWriteArrayList and then insert rows one by one. This 
is expensive.
   
   If we really need CopyOnWriteArrayList we can construct CopyOnWriteArrayList 
at once when we prepared all items



##########
sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java:
##########
@@ -60,7 +61,12 @@ public void createTable(Table table) {
     } else {
       String identifier = getIdentifier(table);
       try {
-        catalogConfig.createTable(identifier, table.getSchema(), 
table.getPartitionFields());
+        Map<String, String> properties =
+            TableUtils.getObjectMapper()
+                .convertValue(table.getProperties(), new 
TypeReference<Map<String, String>>() {});
+        ;

Review Comment:
   nit.



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterCatalog.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import static 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogManagerSchema;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogSchema;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlAlter;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperator;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlWriter;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class SqlAlterCatalog extends SqlAlter implements 
BeamSqlParser.ExecutableStatement {

Review Comment:
   Just a question - these DDLs are added in the main sql module. Are they now 
supported by Beam SQL in general, or is just supported for Icebeg Catalog and 
Tables?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to