vinothchandar commented on a change in pull request #3247: URL: https://github.com/apache/hudi/pull/3247#discussion_r670175121
########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieAppendOnlyRowCreateHandle.java ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.storage.HoodieInternalRowFileWriter; +import org.apache.hudi.io.storage.HoodieInternalRowFileWriterFactory; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +import java.io.IOException; + +/** + * RowCreateHandle to be used when meta columns are disabled. + */ +public class HoodieAppendOnlyRowCreateHandle extends HoodieRowCreateHandle { Review comment: rename: `HoodieNoMetaRowCreateHandle` or sth? lets not leak higher level use-cases into class names low down the stack? Row create handle implies we are appending data? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieAppendOnlyRowCreateHandle.java ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.storage.HoodieInternalRowFileWriter; +import org.apache.hudi.io.storage.HoodieInternalRowFileWriterFactory; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +import java.io.IOException; + +/** + * RowCreateHandle to be used when meta columns are disabled. + */ +public class HoodieAppendOnlyRowCreateHandle extends HoodieRowCreateHandle { + + public HoodieAppendOnlyRowCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId, String instantTime, + int taskPartitionId, long taskId, long taskEpochId, StructType structType) { + super(table, writeConfig, partitionPath, fileId, instantTime, taskPartitionId, taskId, taskEpochId, structType); + } + + /** + * Write the incoming InternalRow as is. + * + * @param record instance of {@link InternalRow} that needs to be written to the fileWriter. + * @throws IOException + */ + @Override + public void write(InternalRow record) throws IOException { + try { + fileWriter.writeRow("", record); Review comment: what are all the empty string? can we avoid such calls? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieAppendOnlyInternalRowParquetWriter.java ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.catalyst.InternalRow; + +import java.io.IOException; + +public class HoodieAppendOnlyInternalRowParquetWriter extends HoodieInternalRowParquetWriter { Review comment: rename this too? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java ########## @@ -116,5 +185,24 @@ void buildFieldPositionMapIfNeeded(StructType structType) { this.structType = structType; } } + + synchronized void buildFieldDataTypesMapIfNeeded(StructType structType) { Review comment: why do we synchronize? any static state that needs to be protected? Would be good to avoid if possible ########## File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java ########## @@ -675,6 +676,11 @@ public PropertyBuilder setBootstrapBasePath(String bootstrapBasePath) { return this; } + public PropertyBuilder setPopulateMetaColumns(boolean populateMetaColumns) { Review comment: we call the constants - metaFields? So should we standardize on `field` over `column` everywher? ########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala ########## @@ -128,14 +128,35 @@ object HoodieSparkSqlWriter { .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY)) .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) .setPartitionColumns(partitionColumns) + .setPopulateMetaColumns(parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean) .initTable(sparkContext.hadoopConfiguration, path.get) tableConfig = tableMetaClient.getTableConfig + } else { + // validate table properties + val tableMetaClient = HoodieTableMetaClient.builder().setBasePath(path.get).setConf(sparkContext.hadoopConfiguration).build() + if (!tableMetaClient.getTableConfig.populateMetaColumns() && + parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean) { + throw new HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key() + " already disabled for the table. Can't be enabled back. ") Review comment: cant be re-enabled. ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieInternalRowFileWriterFactory.java ########## @@ -27,24 +27,24 @@ import org.apache.hadoop.fs.Path; import org.apache.spark.sql.types.StructType; -import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; - import java.io.IOException; +import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; + /** * Factory to assist in instantiating a new {@link HoodieInternalRowFileWriter}. */ public class HoodieInternalRowFileWriterFactory { /** * Factory method to assist in instantiating an instance of {@link HoodieInternalRowFileWriter}. - * @param path path of the RowFileWriter. + * + * @param path path of the RowFileWriter. * @param hoodieTable instance of {@link HoodieTable} in use. Review comment: can we please avoid all the whitespace changes. It makes review actually pretty hard. ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieAppendOnlyRowCreateHandle.java ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.storage.HoodieInternalRowFileWriter; +import org.apache.hudi.io.storage.HoodieInternalRowFileWriterFactory; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +import java.io.IOException; + +/** + * RowCreateHandle to be used when meta columns are disabled. + */ +public class HoodieAppendOnlyRowCreateHandle extends HoodieRowCreateHandle { + + public HoodieAppendOnlyRowCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId, String instantTime, + int taskPartitionId, long taskId, long taskEpochId, StructType structType) { + super(table, writeConfig, partitionPath, fileId, instantTime, taskPartitionId, taskId, taskEpochId, structType); + } + + /** + * Write the incoming InternalRow as is. + * + * @param record instance of {@link InternalRow} that needs to be written to the fileWriter. + * @throws IOException + */ + @Override + public void write(InternalRow record) throws IOException { + try { + fileWriter.writeRow("", record); + writeStatus.markSuccess(""); + } catch (Throwable ge) { + writeStatus.setGlobalError(ge); + throw ge; Review comment: wrap into a HoodieException? ########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala ########## @@ -128,14 +128,35 @@ object HoodieSparkSqlWriter { .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY)) .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) .setPartitionColumns(partitionColumns) + .setPopulateMetaColumns(parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean) .initTable(sparkContext.hadoopConfiguration, path.get) tableConfig = tableMetaClient.getTableConfig + } else { + // validate table properties + val tableMetaClient = HoodieTableMetaClient.builder().setBasePath(path.get).setConf(sparkContext.hadoopConfiguration).build() Review comment: In any case, please pull all this validation into its own method. ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieInternalRowFileWriterFactory.java ########## @@ -60,20 +60,46 @@ private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter( Path path, HoodieWriteConfig writeConfig, StructType structType, HoodieTable table) throws IOException { BloomFilter filter = BloomFilterFactory.createBloomFilter( - writeConfig.getBloomFilterNumEntries(), - writeConfig.getBloomFilterFPP(), - writeConfig.getDynamicBloomFilterMaxNumEntries(), - writeConfig.getBloomFilterType()); + writeConfig.getBloomFilterNumEntries(), + writeConfig.getBloomFilterFPP(), + writeConfig.getDynamicBloomFilterMaxNumEntries(), + writeConfig.getBloomFilterType()); HoodieRowParquetWriteSupport writeSupport = - new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter); + new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter); return new HoodieInternalRowParquetWriter( path, new HoodieRowParquetConfig( - writeSupport, - writeConfig.getParquetCompressionCodec(), - writeConfig.getParquetBlockSize(), - writeConfig.getParquetPageSize(), - writeConfig.getParquetMaxFileSize(), - writeSupport.getHadoopConf(), - writeConfig.getParquetCompressionRatio())); + writeSupport, + writeConfig.getParquetCompressionCodec(), + writeConfig.getParquetBlockSize(), + writeConfig.getParquetPageSize(), + writeConfig.getParquetMaxFileSize(), + writeSupport.getHadoopConf(), + writeConfig.getParquetCompressionRatio())); + } + + public static HoodieInternalRowFileWriter getInternalRowAppendOnlyFileWriter( + Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType schema) + throws IOException { + final String extension = FSUtils.getFileExtension(path.getName()); + if (PARQUET.getFileExtension().equals(extension)) { + return newParquetInternalRowAppendOnlyFileWriter(path, config, schema, hoodieTable); + } + throw new UnsupportedOperationException(extension + " format not supported yet."); Review comment: Always throw Hoodie specific versions of the exceptions please ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java ########## @@ -18,48 +18,71 @@ package org.apache.hudi.keygen; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.ApiMaturityLevel; import org.apache.hudi.AvroConversionHelper; import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieKeyException; + +import org.apache.avro.generic.GenericRecord; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.package$; import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.catalyst.expressions.Attribute; +import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.StructType; -import scala.Function1; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + +import scala.Function1; +import scala.collection.JavaConversions; +import scala.collection.JavaConverters; /** * Base class for the built-in key generators. Contains methods structured for * code reuse amongst them. */ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements SparkKeyGeneratorInterface { + private static final Logger LOG = LogManager.getLogger(BuiltinKeyGenerator.class); + private static final String STRUCT_NAME = "hoodieRowTopLevelField"; private static final String NAMESPACE = "hoodieRow"; private transient Function1<Object, Object> converterFn = null; protected StructType structType; + protected ExpressionEncoder encoder; protected Map<String, List<Integer>> recordKeyPositions = new HashMap<>(); protected Map<String, List<Integer>> partitionPathPositions = new HashMap<>(); + protected Map<String, List<DataType>> partitionPathDataTypes = null; protected BuiltinKeyGenerator(TypedProperties config) { super(config); } /** * Fetch record key from {@link Row}. + * * @param row instance of {@link Row} from which record key is requested. * @return the record key of interest from {@link Row}. */ @Override @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) public String getRecordKey(Row row) { if (null == converterFn) { + LOG.warn("Instantiating row converter fn 11 "); Review comment: remove all debug logging like this? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieInternalRowFileWriterFactory.java ########## @@ -60,20 +60,46 @@ private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter( Path path, HoodieWriteConfig writeConfig, StructType structType, HoodieTable table) throws IOException { BloomFilter filter = BloomFilterFactory.createBloomFilter( - writeConfig.getBloomFilterNumEntries(), - writeConfig.getBloomFilterFPP(), - writeConfig.getDynamicBloomFilterMaxNumEntries(), - writeConfig.getBloomFilterType()); + writeConfig.getBloomFilterNumEntries(), + writeConfig.getBloomFilterFPP(), + writeConfig.getDynamicBloomFilterMaxNumEntries(), + writeConfig.getBloomFilterType()); HoodieRowParquetWriteSupport writeSupport = - new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter); + new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter); return new HoodieInternalRowParquetWriter( path, new HoodieRowParquetConfig( - writeSupport, - writeConfig.getParquetCompressionCodec(), - writeConfig.getParquetBlockSize(), - writeConfig.getParquetPageSize(), - writeConfig.getParquetMaxFileSize(), - writeSupport.getHadoopConf(), - writeConfig.getParquetCompressionRatio())); + writeSupport, + writeConfig.getParquetCompressionCodec(), + writeConfig.getParquetBlockSize(), + writeConfig.getParquetPageSize(), + writeConfig.getParquetMaxFileSize(), + writeSupport.getHadoopConf(), + writeConfig.getParquetCompressionRatio())); + } + + public static HoodieInternalRowFileWriter getInternalRowAppendOnlyFileWriter( + Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType schema) + throws IOException { + final String extension = FSUtils.getFileExtension(path.getName()); Review comment: can we check this based on base file format of the `hoodieTable` that's better than relying on the file extension ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieAppendOnlyRowParquetWriteSupport.java ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.spark.sql.types.StructType; + +import java.util.Collections; + +/** + * Hoodie Write Support for directly writing Row to Parquet. + */ +public class HoodieAppendOnlyRowParquetWriteSupport extends HoodieRowParquetWriteSupport { Review comment: rename? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieAppendOnlyRowCreateHandle.java ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; Review comment: move this to org.apache.hudi.io.row? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java ########## @@ -81,7 +105,52 @@ public String getPartitionPath(Row row) { return getKey(genericRecord).getPartitionPath(); } - void buildFieldPositionMapIfNeeded(StructType structType) { + /** + * Fetch partition path from {@link InternalRow}. + * + * @param internalRow {@link InternalRow} instance from which partition path needs to be fetched from. + * @param structType schema of the internalRow. + * @return the partition path. + */ + public String getPartitionPath(InternalRow internalRow, StructType structType) { + try { + Row row = deserializeRow(getEncoder(structType), internalRow); + return getPartitionPath(row); + } catch (Exception e) { + throw new HoodieIOException("Conversion of InternalRow to Row failed with exception " + e); + } + } + + private ExpressionEncoder getEncoder(StructType structType) { + if (encoder == null) { + encoder = getRowEncoder(structType); + } + return encoder; + } + + private static ExpressionEncoder getRowEncoder(StructType schema) { Review comment: rename to `void initEncoder()` ? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java ########## @@ -81,7 +105,52 @@ public String getPartitionPath(Row row) { return getKey(genericRecord).getPartitionPath(); } - void buildFieldPositionMapIfNeeded(StructType structType) { + /** + * Fetch partition path from {@link InternalRow}. + * + * @param internalRow {@link InternalRow} instance from which partition path needs to be fetched from. + * @param structType schema of the internalRow. + * @return the partition path. + */ + public String getPartitionPath(InternalRow internalRow, StructType structType) { + try { + Row row = deserializeRow(getEncoder(structType), internalRow); + return getPartitionPath(row); + } catch (Exception e) { + throw new HoodieIOException("Conversion of InternalRow to Row failed with exception " + e); + } + } + + private ExpressionEncoder getEncoder(StructType structType) { + if (encoder == null) { + encoder = getRowEncoder(structType); + } + return encoder; + } + + private static ExpressionEncoder getRowEncoder(StructType schema) { + List<Attribute> attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream() + .map(Attribute::toAttribute).collect(Collectors.toList()); + return RowEncoder.apply(schema) Review comment: have you tested this with both spark 3 and 2 ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieInternalRowFileWriterFactory.java ########## @@ -60,20 +60,46 @@ private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter( Path path, HoodieWriteConfig writeConfig, StructType structType, HoodieTable table) throws IOException { BloomFilter filter = BloomFilterFactory.createBloomFilter( - writeConfig.getBloomFilterNumEntries(), - writeConfig.getBloomFilterFPP(), - writeConfig.getDynamicBloomFilterMaxNumEntries(), - writeConfig.getBloomFilterType()); + writeConfig.getBloomFilterNumEntries(), + writeConfig.getBloomFilterFPP(), + writeConfig.getDynamicBloomFilterMaxNumEntries(), + writeConfig.getBloomFilterType()); HoodieRowParquetWriteSupport writeSupport = - new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter); + new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter); return new HoodieInternalRowParquetWriter( path, new HoodieRowParquetConfig( - writeSupport, - writeConfig.getParquetCompressionCodec(), - writeConfig.getParquetBlockSize(), - writeConfig.getParquetPageSize(), - writeConfig.getParquetMaxFileSize(), - writeSupport.getHadoopConf(), - writeConfig.getParquetCompressionRatio())); + writeSupport, + writeConfig.getParquetCompressionCodec(), + writeConfig.getParquetBlockSize(), + writeConfig.getParquetPageSize(), + writeConfig.getParquetMaxFileSize(), + writeSupport.getHadoopConf(), + writeConfig.getParquetCompressionRatio())); + } + + public static HoodieInternalRowFileWriter getInternalRowAppendOnlyFileWriter( Review comment: rename ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java ########## @@ -79,4 +82,28 @@ public String getPartitionPath(Row row) { throw new HoodieKeyGeneratorException("Unable to parse input partition field :" + fieldVal, e); } } + + @Override + public String getPartitionPath(InternalRow internalRow, StructType structType) { Review comment: can we see if we can simply/reuse more code across these key generator classes? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java ########## @@ -81,7 +105,52 @@ public String getPartitionPath(Row row) { return getKey(genericRecord).getPartitionPath(); } - void buildFieldPositionMapIfNeeded(StructType structType) { + /** + * Fetch partition path from {@link InternalRow}. + * + * @param internalRow {@link InternalRow} instance from which partition path needs to be fetched from. + * @param structType schema of the internalRow. + * @return the partition path. + */ + public String getPartitionPath(InternalRow internalRow, StructType structType) { + try { + Row row = deserializeRow(getEncoder(structType), internalRow); + return getPartitionPath(row); + } catch (Exception e) { + throw new HoodieIOException("Conversion of InternalRow to Row failed with exception " + e); + } + } + + private ExpressionEncoder getEncoder(StructType structType) { + if (encoder == null) { + encoder = getRowEncoder(structType); + } + return encoder; + } + + private static ExpressionEncoder getRowEncoder(StructType schema) { + List<Attribute> attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream() + .map(Attribute::toAttribute).collect(Collectors.toList()); + return RowEncoder.apply(schema) + .resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(), + SimpleAnalyzer$.MODULE$); + } + + private static Row deserializeRow(ExpressionEncoder encoder, InternalRow row) + throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, ClassNotFoundException { + // TODO remove reflection if Spark 2.x support is dropped + if (package$.MODULE$.SPARK_VERSION().startsWith("2.")) { Review comment: I think there is a similar method in `spark-common` like this? Can we try and move that up here, so we can avoid this code duplication? It ll become harder and harder to maintain ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java ########## @@ -81,7 +105,52 @@ public String getPartitionPath(Row row) { return getKey(genericRecord).getPartitionPath(); } - void buildFieldPositionMapIfNeeded(StructType structType) { + /** + * Fetch partition path from {@link InternalRow}. + * + * @param internalRow {@link InternalRow} instance from which partition path needs to be fetched from. + * @param structType schema of the internalRow. + * @return the partition path. + */ + public String getPartitionPath(InternalRow internalRow, StructType structType) { + try { + Row row = deserializeRow(getEncoder(structType), internalRow); + return getPartitionPath(row); + } catch (Exception e) { + throw new HoodieIOException("Conversion of InternalRow to Row failed with exception " + e); + } + } + + private ExpressionEncoder getEncoder(StructType structType) { + if (encoder == null) { Review comment: ```Java if (encoder == null) { initEncoder(); } return encoder; ``` its already an instant variable, why pass it back and forth between private methods? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java ########## @@ -72,4 +75,16 @@ public String getPartitionPath(Row row) { return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(), hiveStylePartitioning, partitionPathPositions); } + + @Override + public String getPartitionPath(InternalRow row, StructType structType) { + buildFieldDataTypesMapIfNeeded(structType); Review comment: is it possible to pass the `structType` just once during creation of the key generator? ########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala ########## @@ -128,14 +128,35 @@ object HoodieSparkSqlWriter { .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY)) .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) .setPartitionColumns(partitionColumns) + .setPopulateMetaColumns(parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean) .initTable(sparkContext.hadoopConfiguration, path.get) tableConfig = tableMetaClient.getTableConfig + } else { + // validate table properties + val tableMetaClient = HoodieTableMetaClient.builder().setBasePath(path.get).setConf(sparkContext.hadoopConfiguration).build() Review comment: is there other places where we do such validation. I think this should be done at the WriteClient level or so? not this high up the stack? ########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala ########## @@ -128,14 +128,35 @@ object HoodieSparkSqlWriter { .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY)) .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) .setPartitionColumns(partitionColumns) + .setPopulateMetaColumns(parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean) .initTable(sparkContext.hadoopConfiguration, path.get) tableConfig = tableMetaClient.getTableConfig + } else { + // validate table properties + val tableMetaClient = HoodieTableMetaClient.builder().setBasePath(path.get).setConf(sparkContext.hadoopConfiguration).build() + if (!tableMetaClient.getTableConfig.populateMetaColumns() && + parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean) { + throw new HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key() + " already disabled for the table. Can't be enabled back. ") + } + } + + if ( !parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean + && operation != WriteOperationType.BULK_INSERT) { + throw new HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key() + " can only be enabled for " + WriteOperationType.BULK_INSERT Review comment: you mean - can only be disabled for? ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java ########## @@ -103,20 +161,22 @@ private HoodieRowCreateHandle getRowCreateHandle(String partitionPath) throws IO if (arePartitionRecordsSorted) { close(); } - handles.put(partitionPath, new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(), + handles.put(partitionPath, populateMetaColumns ? new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(), Review comment: lets pull out the ternary operator into its own line.. and have a variable to hold the create handle ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java ########## @@ -52,30 +62,78 @@ private final StructType structType; private final Boolean arePartitionRecordsSorted; private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>(); - + private final boolean populateMetaColumns; private HoodieRowCreateHandle handle; private String lastKnownPartitionPath = null; private String fileIdPrefix; private int numFilesWritten = 0; private Map<String, HoodieRowCreateHandle> handles = new HashMap<>(); + private BuiltinKeyGenerator keyGenerator = null; + private boolean simpleKeyGen = false; + private int simplePartitionFieldIndex = -1; + private DataType simplePartitionFieldDataType; public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteConfig writeConfig, - String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType, boolean arePartitionRecordsSorted) { + String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType, + boolean populateMetaColumns, boolean arePartitionRecordsSorted) { this.hoodieTable = hoodieTable; this.writeConfig = writeConfig; this.instantTime = instantTime; this.taskPartitionId = taskPartitionId; this.taskId = taskId; this.taskEpochId = taskEpochId; this.structType = structType; + this.populateMetaColumns = populateMetaColumns; this.arePartitionRecordsSorted = arePartitionRecordsSorted; this.fileIdPrefix = UUID.randomUUID().toString(); + if (!populateMetaColumns) { + this.keyGenerator = getKeyGenerator(writeConfig.getProps()); + if (keyGenerator instanceof SimpleKeyGenerator) { + simpleKeyGen = true; + simplePartitionFieldIndex = (Integer) structType.getFieldIndex((keyGenerator).getPartitionPathFields().get(0)).get(); + simplePartitionFieldDataType = structType.fields()[simplePartitionFieldIndex].dataType(); + } + } + } + + /** + * Instantiate {@link BuiltinKeyGenerator}. + * + * @param properties properties map. + * @return the key generator thus instantiated. + */ + private BuiltinKeyGenerator getKeyGenerator(Properties properties) { + TypedProperties typedProperties = new TypedProperties(); + typedProperties.putAll(properties); + if (properties.get(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY().key()).equals(NonpartitionedKeyGenerator.class.getName())) { + return null; // Do not instantiate NonPartitionKeyGen + } else { + try { + return (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties); + } catch (ClassCastException cce) { + throw new HoodieIOException("Only those key gens implementing BuiltInKeyGenerator interface is supported in disabling meta columns path"); + } catch (IOException e) { + throw new HoodieIOException("Key generator instantiation failed ", e); + } + } } public void write(InternalRow record) throws IOException { try { - String partitionPath = record.getUTF8String( - HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString(); + String partitionPath = null; + if (populateMetaColumns) { // usual path where meta columns are pre populated in prep step. + partitionPath = record.getUTF8String( + HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString(); + } else { // if meta columns are disabled. + if (keyGenerator == null) { // NoPartitionerKeyGen Review comment: argh ########## File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java ########## @@ -138,6 +138,11 @@ .noDefaultValue() .withDocumentation("Base path of the dataset that needs to be bootstrapped as a Hudi table"); + public static final ConfigProperty<String> HOODIE_POPULATE_META_COLUMNS = ConfigProperty + .key("hoodie.populate.meta.columns") + .defaultValue("true") + .withDocumentation("When enabled, populates all meta columns. When disabled, no meta columns are populated"); Review comment: add `.. and incremental queries will not be functional. This is only meant to be used for append only/immutable data for batch processing` ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java ########## @@ -52,30 +62,78 @@ private final StructType structType; private final Boolean arePartitionRecordsSorted; private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>(); - + private final boolean populateMetaColumns; private HoodieRowCreateHandle handle; private String lastKnownPartitionPath = null; private String fileIdPrefix; private int numFilesWritten = 0; private Map<String, HoodieRowCreateHandle> handles = new HashMap<>(); + private BuiltinKeyGenerator keyGenerator = null; + private boolean simpleKeyGen = false; + private int simplePartitionFieldIndex = -1; + private DataType simplePartitionFieldDataType; public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteConfig writeConfig, - String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType, boolean arePartitionRecordsSorted) { + String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType, + boolean populateMetaColumns, boolean arePartitionRecordsSorted) { this.hoodieTable = hoodieTable; this.writeConfig = writeConfig; this.instantTime = instantTime; this.taskPartitionId = taskPartitionId; this.taskId = taskId; this.taskEpochId = taskEpochId; this.structType = structType; + this.populateMetaColumns = populateMetaColumns; this.arePartitionRecordsSorted = arePartitionRecordsSorted; this.fileIdPrefix = UUID.randomUUID().toString(); + if (!populateMetaColumns) { + this.keyGenerator = getKeyGenerator(writeConfig.getProps()); + if (keyGenerator instanceof SimpleKeyGenerator) { + simpleKeyGen = true; + simplePartitionFieldIndex = (Integer) structType.getFieldIndex((keyGenerator).getPartitionPathFields().get(0)).get(); + simplePartitionFieldDataType = structType.fields()[simplePartitionFieldIndex].dataType(); + } + } + } + + /** + * Instantiate {@link BuiltinKeyGenerator}. + * + * @param properties properties map. + * @return the key generator thus instantiated. + */ + private BuiltinKeyGenerator getKeyGenerator(Properties properties) { + TypedProperties typedProperties = new TypedProperties(); + typedProperties.putAll(properties); + if (properties.get(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY().key()).equals(NonpartitionedKeyGenerator.class.getName())) { + return null; // Do not instantiate NonPartitionKeyGen + } else { + try { + return (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties); + } catch (ClassCastException cce) { + throw new HoodieIOException("Only those key gens implementing BuiltInKeyGenerator interface is supported in disabling meta columns path"); Review comment: please write the error messages from a user facing angle. `meta columns path` can be a confusing thing to read, when all users do is deal with file/folder paths. ########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala ########## @@ -128,14 +128,35 @@ object HoodieSparkSqlWriter { .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY)) .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) .setPartitionColumns(partitionColumns) + .setPopulateMetaColumns(parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean) .initTable(sparkContext.hadoopConfiguration, path.get) tableConfig = tableMetaClient.getTableConfig + } else { + // validate table properties + val tableMetaClient = HoodieTableMetaClient.builder().setBasePath(path.get).setConf(sparkContext.hadoopConfiguration).build() + if (!tableMetaClient.getTableConfig.populateMetaColumns() && + parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean) { + throw new HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key() + " already disabled for the table. Can't be enabled back. ") + } + } + + if ( !parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key(), HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.defaultValue()).toBoolean + && operation != WriteOperationType.BULK_INSERT) { + throw new HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_COLUMNS.key() + " can only be enabled for " + WriteOperationType.BULK_INSERT + + " operation"); } val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType) // short-circuit if bulk_insert via row is enabled. // scalastyle:off + if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER_OPT_KEY) && Review comment: please move the entire row writing block into its own method. ########## File path: hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java ########## @@ -112,4 +116,40 @@ return bulkInsertPartitionerRows.repartitionRecords(colOrderedDataset, config.getBulkInsertShuffleParallelism()); } + + /** + * Add empty meta columns and reorder such that meta columns are at the beginning. + * + * @param rows + * @return + */ + public static Dataset<Row> prepareHoodieDatasetForBulkInsertAppendOnly(Dataset<Row> rows) { Review comment: `prepareHoodieDatasetForBulkInsertWithoutMetaFields` , just one terminology to talk about a fork in the code path ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala ########## @@ -298,7 +298,7 @@ object DataSourceWriteOptions { val ENABLE_ROW_WRITER_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.row.writer.enable") .defaultValue("false") - .withDocumentation("") + .withDocumentation("Will perform write operations directly using the spark native `Row` representation") Review comment: I fixed this already. please rebase and consolidate. ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java ########## @@ -52,30 +62,78 @@ private final StructType structType; private final Boolean arePartitionRecordsSorted; private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>(); - + private final boolean populateMetaColumns; private HoodieRowCreateHandle handle; private String lastKnownPartitionPath = null; private String fileIdPrefix; private int numFilesWritten = 0; private Map<String, HoodieRowCreateHandle> handles = new HashMap<>(); + private BuiltinKeyGenerator keyGenerator = null; + private boolean simpleKeyGen = false; + private int simplePartitionFieldIndex = -1; + private DataType simplePartitionFieldDataType; public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteConfig writeConfig, - String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType, boolean arePartitionRecordsSorted) { + String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType, + boolean populateMetaColumns, boolean arePartitionRecordsSorted) { this.hoodieTable = hoodieTable; this.writeConfig = writeConfig; this.instantTime = instantTime; this.taskPartitionId = taskPartitionId; this.taskId = taskId; this.taskEpochId = taskEpochId; this.structType = structType; + this.populateMetaColumns = populateMetaColumns; this.arePartitionRecordsSorted = arePartitionRecordsSorted; this.fileIdPrefix = UUID.randomUUID().toString(); + if (!populateMetaColumns) { + this.keyGenerator = getKeyGenerator(writeConfig.getProps()); + if (keyGenerator instanceof SimpleKeyGenerator) { + simpleKeyGen = true; + simplePartitionFieldIndex = (Integer) structType.getFieldIndex((keyGenerator).getPartitionPathFields().get(0)).get(); + simplePartitionFieldDataType = structType.fields()[simplePartitionFieldIndex].dataType(); + } + } + } + + /** + * Instantiate {@link BuiltinKeyGenerator}. + * + * @param properties properties map. + * @return the key generator thus instantiated. + */ + private BuiltinKeyGenerator getKeyGenerator(Properties properties) { + TypedProperties typedProperties = new TypedProperties(); + typedProperties.putAll(properties); + if (properties.get(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY().key()).equals(NonpartitionedKeyGenerator.class.getName())) { + return null; // Do not instantiate NonPartitionKeyGen Review comment: Please refrain from using `null` as return value. We have `Option` for that, without NPE business ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java ########## @@ -72,4 +75,16 @@ public String getPartitionPath(Row row) { return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(), hiveStylePartitioning, partitionPathPositions); } + + @Override + public String getPartitionPath(InternalRow row, StructType structType) { + buildFieldDataTypesMapIfNeeded(structType); Review comment: naive question. `InternalRow` does not have the schema with itself.? row.getSchema? ########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala ########## @@ -377,6 +398,60 @@ object HoodieSparkSqlWriter { (syncHiveSuccess, common.util.Option.ofNullable(instantTime)) } + def bulkInsertAsRowNoMetaColumns(sqlContext: SQLContext, Review comment: lets please unify this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
