yyanyy commented on a change in pull request #2285:
URL: https://github.com/apache/iceberg/pull/2285#discussion_r585840378
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -361,25 +362,27 @@ public void createTable(ObjectPath tablePath,
CatalogBaseTable table, boolean ig
validateFlinkTable(table);
Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
- PartitionSpec spec = toPartitionSpec(((CatalogTable)
table).getPartitionKeys(), icebergSchema);
+ PartitionSpec.Builder specBuilder = toPartitionSpec(((CatalogTable)
table).getPartitionKeys(), icebergSchema);
- ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
String location = null;
+ ImmutableMap.Builder<String, String> propsBuilder = ImmutableMap.builder();
Review comment:
Nit: seems like an unnecessary change (rename)
##########
File path: flink/src/main/java/org/apache/iceberg/flink/PartitionUtil.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.Locale;
+import java.util.regex.Matcher;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Term;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+
+class PartitionUtil {
+
+ private PartitionUtil() {
+ }
+
+ static String partitionPropKey(String colName) {
+ return String.format("%s%s", FlinkTableProperties.PARTITION_BY_PREFIX,
colName);
+ }
+
+ static boolean isPartitionField(String propKey) {
+ return propKey != null &&
propKey.startsWith(FlinkTableProperties.PARTITION_BY_PREFIX);
+ }
+
+ private static String partitionField(String propKey) {
+ if (propKey != null &&
propKey.startsWith(FlinkTableProperties.PARTITION_BY_PREFIX)) {
+ return propKey.replace(FlinkTableProperties.PARTITION_BY_PREFIX, "");
+ } else {
+ return null;
+ }
+ }
+
+ static void addPartitionField(Schema schema, PartitionSpec.Builder builder,
String propKey, String propValue) {
+ String colName = partitionField(propKey);
+ Preconditions.checkNotNull(colName,
+ "Table property '%s' is not an valid partition field property, please
use '%s<column-name>'", propKey,
+ FlinkTableProperties.PARTITION_BY_PREFIX);
+
+ Types.NestedField nestedField = schema.findField(colName);
+ Preconditions.checkNotNull(nestedField, "Cannot find field %s in schema:
%s", colName, schema);
+
+ Transform<?, ?> transform = Transforms.fromString(nestedField.type(),
propValue);
+ String transformString = transform.toString();
+ switch (transformType(transformString)) {
+ case "bucket":
+ builder.bucket(colName, findWidth(transformString));
+ break;
+
+ case "truncate":
+ builder.truncate(colName, findWidth(transformString));
+ break;
+
+ case "identify":
+ builder.identity(colName);
+ break;
+
+ case "year":
+ builder.year(colName);
+ break;
+
+ case "month":
+ builder.month(colName);
+ break;
+
+ case "day":
+ builder.day(colName);
+ break;
+
+ case "hour":
+ builder.hour(colName);
+ break;
+
+ case "void":
+ builder.alwaysNull(colName);
+ break;
+
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown transform %s for field %s",
transformString, colName));
+ }
+ }
+
+ static Term toIcebergTerm(String colName, Transform<?, ?> transform) {
+ String transformString = transform.toString();
+ switch (transformType(transformString)) {
+ case "bucket":
+ return Expressions.bucket(colName, findWidth(transformString));
+
+ case "truncate":
+ return Expressions.truncate(colName, findWidth(transformString));
+
+ case "identify":
Review comment:
you mean "identity"? both here and L74, L139; might also want test
coverage for this class to catch similar issue?
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -471,10 +477,11 @@ private static void validateFlinkTable(CatalogBaseTable
table) {
}
}
- private static PartitionSpec toPartitionSpec(List<String> partitionKeys,
Schema icebergSchema) {
+ private static PartitionSpec.Builder toPartitionSpec(List<String>
partitionKeys, Schema icebergSchema) {
Review comment:
nit: wondering if we want to rename this to `toPartitionSpecBuilder`
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -573,7 +597,7 @@ public boolean partitionExists(ObjectPath tablePath,
CatalogPartitionSpec partit
@Override
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, CatalogPartition partition,
- boolean ignoreIfExists) throws CatalogException {
+ boolean ignoreIfExists) throws CatalogException {
Review comment:
Nit: seems like unnecessary changes, same as below
##########
File path: flink/src/main/java/org/apache/iceberg/flink/PartitionUtil.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.Locale;
+import java.util.regex.Matcher;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Term;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+
+class PartitionUtil {
+
+ private PartitionUtil() {
+ }
+
+ static String partitionPropKey(String colName) {
Review comment:
nit: this looks like only being used by test, might be able to move to
test?
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
##########
@@ -518,6 +526,22 @@ private static void commitChanges(Table table, String
setLocation, String setSna
.commit();
}
+ if (newSpec != null) {
Review comment:
looks like `newSpec` is always not null after this change, which means
this transaction is always triggered. Do we want to compare this with the old
spec and if they are the same, we can skip this transaction?
##########
File path: flink/src/main/java/org/apache/iceberg/flink/PartitionUtil.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.Locale;
+import java.util.regex.Matcher;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Term;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+
+class PartitionUtil {
+
+ private PartitionUtil() {
+ }
+
+ static String partitionPropKey(String colName) {
+ return String.format("%s%s", FlinkTableProperties.PARTITION_BY_PREFIX,
colName);
+ }
+
+ static boolean isPartitionField(String propKey) {
+ return propKey != null &&
propKey.startsWith(FlinkTableProperties.PARTITION_BY_PREFIX);
+ }
+
+ private static String partitionField(String propKey) {
+ if (propKey != null &&
propKey.startsWith(FlinkTableProperties.PARTITION_BY_PREFIX)) {
+ return propKey.replace(FlinkTableProperties.PARTITION_BY_PREFIX, "");
+ } else {
+ return null;
+ }
+ }
+
+ static void addPartitionField(Schema schema, PartitionSpec.Builder builder,
String propKey, String propValue) {
+ String colName = partitionField(propKey);
+ Preconditions.checkNotNull(colName,
+ "Table property '%s' is not an valid partition field property, please
use '%s<column-name>'", propKey,
+ FlinkTableProperties.PARTITION_BY_PREFIX);
+
+ Types.NestedField nestedField = schema.findField(colName);
+ Preconditions.checkNotNull(nestedField, "Cannot find field %s in schema:
%s", colName, schema);
+
+ Transform<?, ?> transform = Transforms.fromString(nestedField.type(),
propValue);
Review comment:
can we directly check `transform instanceOf` and do the parse and assign
for each type? I think it would be safer than string comparison
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]