pengzhiwei2018 commented on a change in pull request #2645: URL: https://github.com/apache/hudi/pull/2645#discussion_r619743008
########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/UuidKeyGenerator.java ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +/** + * A KeyGenerator which use the uuid as the record key. + */ +public class UuidKeyGenerator extends BuiltinKeyGenerator { Review comment: Yes, it make sense to me! I prefer to make this in a separate PR because it need some work to pass the commit time. I have file a JIRA at [1840](https://issues.apache.org/jira/browse/HUDI-1840) ########## File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java ########## @@ -59,6 +60,8 @@ public static final String HOODIE_TABLE_VERSION_PROP_NAME = "hoodie.table.version"; public static final String HOODIE_TABLE_PRECOMBINE_FIELD = "hoodie.table.precombine.field"; public static final String HOODIE_TABLE_PARTITION_COLUMNS = "hoodie.table.partition.columns"; + public static final String HOODIE_TABLE_ROWKEY_FIELDS = "hoodie.table.rowkey.fields"; + public static final String HOODIE_TABLE_SCHEMA = "hoodie.table.schema"; Review comment: make sense to me! ########## File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import scala.collection.JavaConverters._ +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField} + +class TestCreateTable extends TestHoodieSqlBase { + + test("Test Create Managed Hoodie Table") { + val tableName = generateTableName + // Create a managed table + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long + | ) using hudi + | options ( + | primaryKey = 'id', + | versionColumn = 'ts' Review comment: yes ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java ########## @@ -192,7 +193,7 @@ protected void initializeIncomingRecordsMap() { long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps()); LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), - new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema)); + new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(inputSchema)); Review comment: > On the inputSchema, is this always equal to the table's schema? The `writeSchema` is always equal to the table's schema, but the `inputSchema` is the schema of the input DataFrame. In the case of merge-into below, The table schema is `id:int, name:string` (see the define of `h0`) The writeSchema is `id:int, name:string` which is same with the table schema. The inputSchema is: `id: int, name:string, flag:int` which is the input DataFrame of `s0`. > Is `hoodie.avro.schema` what you are calling as inputSchema? Yes, the inputSchema is the `hoodie.avro.schema` which is the schema of input DataFrame. In my below case is the `s0` ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java ########## @@ -174,7 +174,8 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) { // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = createTable(config, hadoopConf); - HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, operationType, config.getSchema(), commitActionType); + HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, + extraMetadata, operationType, config.getWriteSchema(), commitActionType); Review comment: Yes, the `writeSchema` may not equal to the `inputSchema` for MergeInto. The `inputSchema` is the schema of the incoming records( come from the `hoodie.avro.schema`) , we use it to parse the bytes to the avro record in the HoodiePayload. The `writeSchema` is always equal to the table schema, we use the `writeSchema` to write the record to the table. The `writeSchema` come from the `hoodie.write.schema` if we set this property, or not, we get it from the `hoodie.avro.schema`. So here, we pass the `writeSchema` to the `HoodieCommitMetadata`. ########## File path: hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java ########## @@ -212,7 +230,11 @@ public Schema getTableAvroSchemaWithoutMetadataFields(HoodieInstant instant) thr */ private Option<Schema> getTableSchemaFromCommitMetadata(boolean includeMetadataFields) { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - return getTableSchemaFromCommitMetadata(timeline.lastInstant().get(), includeMetadataFields); + if (timeline.lastInstant().isPresent()) { Review comment: Here I add a test for `timeline.lastInstant().isPreset` to avoid crash when there is no commit in the table. ########## File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java ########## @@ -59,6 +60,8 @@ public static final String HOODIE_TABLE_VERSION_PROP_NAME = "hoodie.table.version"; public static final String HOODIE_TABLE_PRECOMBINE_FIELD = "hoodie.table.precombine.field"; public static final String HOODIE_TABLE_PARTITION_COLUMNS = "hoodie.table.partition.columns"; + public static final String HOODIE_TABLE_ROWKEY_FIELDS = "hoodie.table.rowkey.fields"; Review comment: ok ########## File path: hudi-spark-datasource/hudi-spark2/pom.xml ########## @@ -29,6 +29,8 @@ <properties> <main.basedir>${project.parent.parent.basedir}</main.basedir> + <scala.binary.version>2.11</scala.binary.version> Review comment: There is no need to add this here now. I will remove it. ########## File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java ########## @@ -591,6 +591,8 @@ public static PropertyBuilder withPropertyBuilder() { private HoodieTableType tableType; private String tableName; + private String tableSchema; Review comment: Yes, we need add some code for upgrade an exists hudi table to the sql. I will file a JIRA for this later. ########## File path: hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UpdateTable.scala ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Expression +// This code is just copy from v2Commands.scala in spark 3.0 +case class UpdateTable( Review comment: Yes, it is. The `hudi-spark2` will also no longer needed. ########## File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java ########## @@ -31,6 +32,11 @@ @Override public List<String> extractPartitionValuesInPath(String partitionPath) { + // If the partitionPath is empty string( which means none-partition table), the partition values Review comment: Yes, I will file a separate PR for this issue. After it is merged, I will remove this code. ########## File path: hudi-spark-datasource/hudi-spark2/src/main/antlr4/imports/SqlBase.g4 ########## @@ -0,0 +1,1099 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * This file is an adaptation of Presto's presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4 grammar. Review comment: This file is just a copy of Spark's `SqlBase.g4` as we must refer spark's syntax. I will fix License header. > can we add this to NOTICE. there are examples there. whats the license of this file? I have found there is the notice for spark. So we do not need add any more. ########## File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import scala.collection.JavaConverters._ +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField} + +class TestCreateTable extends TestHoodieSqlBase { + + test("Test Create Managed Hoodie Table") { + val tableName = generateTableName + // Create a managed table + spark.sql( + s""" + | create table $tableName ( Review comment: The spark "In-Memory" Catalog. ########## File path: packaging/hudi-integ-test-bundle/pom.xml ########## @@ -73,8 +73,7 @@ <include>org.apache.hudi:hudi-spark-common</include> <include>org.apache.hudi:hudi-utilities_${scala.binary.version}</include> <include>org.apache.hudi:hudi-spark_${scala.binary.version}</include> - <include>org.apache.hudi:hudi-spark2_${scala.binary.version}</include> - <include>org.apache.hudi:hudi-spark3_2.12</include> + <include>org.apache.hudi:${hudi.spark.moudle}_${scala.binary.version}</include> Review comment: Yes, for the spark2 and spark3 module, only one of them can include into the project. We cannot include spark2 and spark3 to the project at the same time.Here I introduce a `hudi.spark.moudle` property to control this. I agree that there are too many properties for spark 2 vs 3. Let me think about how to simplify this. ########## File path: hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java ########## @@ -97,4 +86,22 @@ public DefaultHoodieRecordPayload(Option<GenericRecord> record) { } return metadata.isEmpty() ? Option.empty() : Option.of(metadata); } + + protected boolean needUpdatePersistedRecord(IndexedRecord currentValue, Review comment: ok ########## File path: hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/mergeInto.scala ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.expressions.{Expression, Unevaluable} +import org.apache.spark.sql.types.DataType + +// This code is just copy from v2Commands.scala in spark 3.0 + +/** + * The logical plan of the MERGE INTO command that works for v2 tables. + */ +case class MergeIntoTable( Review comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
