vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468745957



##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured 
for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key 
or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String 
structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, 
Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, 
RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> 
!(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,
+              Collections.singletonList((Integer) 
(structType.getFieldIndex(f).get()))));
+      // parse nested fields
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> 
f.contains("."))
+          .forEach(f -> partitionPathPositions.put(f,
+              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, 
false)));
+    }
+    this.structName = structName;
+    this.structType = structType;
+    this.recordNamespace = recordNamespace;
+  }
+
+  /**
+   * Fetch record key from {@link Row}.
+   * @param row instance of {@link Row} from which record key is requested.
+   * @return the record key of interest from {@link Row}.
+   */
+  @Override
+  public String getRecordKey(Row row) {
+    if (null != converterFn) {

Review comment:
       as far as I can tell, this is private and set to null by default and not 
assigned anywhere else. so we will never pass `if (null != ..)` check. I think 
this should be if (null ==converterFn) if the intention was lazy initialization.

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured 
for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key 
or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String 
structName, String recordNamespace) {

Review comment:
       note to self: need to understand this better and see if we can simplify

##########
File path: 
hudi-client/src/test/java/org/apache/hudi/testutils/HoodieDatasetTestUtils.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.testutils;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+
+import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
+
+/**
+ * Dataset test utils.
+ */
+public class HoodieDatasetTestUtils {

Review comment:
       this is a misleading name. Need to rename this. its unclear if it refers 
to a hoodie dataset or a spark dataset framework.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
##########
@@ -35,6 +35,7 @@
   // bulk insert
   BULK_INSERT("bulk_insert"),
   BULK_INSERT_PREPPED("bulk_insert_prepped"),
+  BULK_INSERT_DATASET("bulk_insert_dataset"),

Review comment:
       Need to understand why this is needed. so, we pick a different mode for 
the writer path I believe. We should use a config and not overload further if 
possible.

##########
File path: hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
##########
@@ -51,4 +53,32 @@ protected KeyGenerator(TypedProperties config) {
     throw new UnsupportedOperationException("Bootstrap not supported for key 
generator. "
         + "Please override this method in your custom key generator.");
   }
+
+  /**
+   * Initializes {@link KeyGenerator} for {@link Row} based operations.
+   * @param structType structype of the dataset.
+   * @param structName struct name of the dataset.
+   * @param recordNamespace record namespace of the dataset.
+   */
+  public void initializeRowKeyGenerator(StructType structType, String 
structName, String recordNamespace) {

Review comment:
       can this be just passed to the `getRecordKey()` methods or overload a 
constructor? sticking a random `init()` method here is not very desirable. 
   
   Overall, this ties the KeyGenerator tightly with Spark. for e.g when we do 
flink, writing a key generator would require a Spark dependency for a flink 
job. This need more thought. 
   
   cc @bvaradar @leesf @nsivabalan 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieRowParquetWriteSupport.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.HashMap;
+
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Hoodie Write Support for directly writing Row to Parquet.
+ */
+public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
+
+  private Configuration hadoopConf;
+  private BloomFilter bloomFilter;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieRowParquetWriteSupport(Configuration conf, StructType 
structType, BloomFilter bloomFilter) {
+    super();
+    Configuration hadoopConf = new Configuration(conf);
+    hadoopConf.set("spark.sql.parquet.writeLegacyFormat", "false");

Review comment:
       should we be hardcoding these?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link InternalRow} 
and keeps meta columns locally. But the {@link InternalRow}
+ * does include the meta columns as well just that {@link HoodieInternalRow} 
will intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link InternalRow}.
+ */
+public class HoodieInternalRow extends InternalRow {
+
+  private String commitTime;
+  private String commitSeqNumber;
+  private String recordKey;
+  private String partitionPath;
+  private String fileName;
+  private InternalRow row;
+
+  public HoodieInternalRow(String commitTime, String commitSeqNumber, String 
recordKey, String partitionPath,
+      String fileName, InternalRow row) {
+    this.commitTime = commitTime;
+    this.commitSeqNumber = commitSeqNumber;
+    this.recordKey = recordKey;
+    this.partitionPath = partitionPath;
+    this.fileName = fileName;
+    this.row = row;
+  }
+
+  @Override
+  public int numFields() {
+    return row.numFields();
+  }
+
+  @Override
+  public void setNullAt(int i) {
+    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+      switch (i) {
+        case 0: {
+          this.commitTime = null;
+          break;
+        }
+        case 1: {
+          this.commitSeqNumber = null;
+          break;
+        }
+        case 2: {
+          this.recordKey = null;
+          break;
+        }
+        case 3: {
+          this.partitionPath = null;
+          break;
+        }
+        case 4: {
+          this.fileName = null;
+          break;
+        }
+        default: throw new IllegalArgumentException("Not expected");
+      }
+    } else {
+      row.setNullAt(i);

Review comment:
       note to self : check if this is indeed correct. i was expecting us to do 
something like `row.setNullAt(i-5)` 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -95,20 +95,20 @@ public boolean commit(String instantTime, 
JavaRDD<WriteStatus> writeStatuses) {
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
       Option<Map<String, String>> extraMetadata) {
-    HoodieTableMetaClient metaClient = createMetaClient(false);
-    return commit(instantTime, writeStatuses, extraMetadata, 
metaClient.getCommitActionType());
+    List<HoodieWriteStat> stats = 
writeStatuses.map(WriteStatus::getStat).collect();
+    return commitStat(instantTime, stats, extraMetadata);
   }
 
-  private boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses,
-      Option<Map<String, String>> extraMetadata, String actionType) {
-
+  // fixme(bulkinsertv2) this name is ughh
+  public boolean commitStat(String instantTime, List<HoodieWriteStat> stats, 
Option<Map<String, String>> extraMetadata) {

Review comment:
       got this when compiling
   
   ```
   Error:(433, 16) overloaded method value commit with alternatives:
     (x$1: String,x$2: 
java.util.List[org.apache.hudi.common.model.HoodieWriteStat],x$3: 
org.apache.hudi.common.util.Option[java.util.Map[String,String]])Boolean <and>
     (x$1: String,x$2: 
org.apache.spark.api.java.JavaRDD[org.apache.hudi.client.WriteStatus],x$3: 
org.apache.hudi.common.util.Option[java.util.Map[String,String]])Boolean
    cannot be applied to (String, 
org.apache.spark.api.java.JavaRDD[org.apache.hudi.client.WriteStatus], 
org.apache.hudi.common.util.Option[java.util.HashMap[String,String]])
           client.commit(instantTime, writeStatuses,
   ```

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured 
for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
+  private Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
+
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
+  private String structName;
+  private String recordNamespace;
+
+  protected BuiltinKeyGenerator(TypedProperties config) {
+    super(config);
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key 
or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  @Override
+  public void initializeRowKeyGenerator(StructType structType, String 
structName, String recordNamespace) {
+    // parse simple feilds
+    getRecordKeyFields().stream()
+        .filter(f -> !(f.contains(".")))
+        .forEach(f -> recordKeyPositions.put(f, 
Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))));
+    // parse nested fields
+    getRecordKeyFields().stream()
+        .filter(f -> f.contains("."))
+        .forEach(f -> recordKeyPositions.put(f, 
RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+    // parse simple fields
+    if (getPartitionPathFields() != null) {
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> 
!(f.contains(".")))
+          .forEach(f -> partitionPathPositions.put(f,
+              Collections.singletonList((Integer) 
(structType.getFieldIndex(f).get()))));
+      // parse nested fields
+      getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> 
f.contains("."))
+          .forEach(f -> partitionPathPositions.put(f,
+              RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, 
false)));
+    }
+    this.structName = structName;
+    this.structType = structType;
+    this.recordNamespace = recordNamespace;
+  }
+
+  /**
+   * Fetch record key from {@link Row}.
+   * @param row instance of {@link Row} from which record key is requested.
+   * @return the record key of interest from {@link Row}.
+   */
+  @Override
+  public String getRecordKey(Row row) {
+    if (null != converterFn) {

Review comment:
       Also, this being in `BuiltinKeyGenerator` and not `KeyGenerator` is a 
problem and will break all the custom key generators out there when they turn 
on row based writing, correct? should we move this up?

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured 
for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+
+  private Map<String, List<Integer>> recordKeyPositions = new HashMap<>();

Review comment:
       is thre a way to avoid using positions and use names instead?

##########
File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
##########
@@ -267,26 +258,26 @@ public static HoodieWriteClient 
createHoodieClient(JavaSparkContext jssc, String
   }
 
   public static JavaRDD<WriteStatus> doDeleteOperation(HoodieWriteClient 
client, JavaRDD<HoodieKey> hoodieKeys,
-                                                       String instantTime) {
+      String instantTime) {
     return client.delete(hoodieKeys, instantTime);
   }
 
   public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable 
orderingVal, HoodieKey hKey,
-                                                String payloadClass) throws 
IOException {
+      String payloadClass) throws IOException {

Review comment:
       another general rule of thumb. we could always review our own diffs 
again before submitting to make sure whitespace changes are all intentional. cc 
@nsivabalan . 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link InternalRow} 
and keeps meta columns locally. But the {@link InternalRow}
+ * does include the meta columns as well just that {@link HoodieInternalRow} 
will intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link InternalRow}.
+ */
+public class HoodieInternalRow extends InternalRow {
+
+  private String commitTime;
+  private String commitSeqNumber;
+  private String recordKey;
+  private String partitionPath;
+  private String fileName;
+  private InternalRow row;
+
+  public HoodieInternalRow(String commitTime, String commitSeqNumber, String 
recordKey, String partitionPath,
+      String fileName, InternalRow row) {
+    this.commitTime = commitTime;
+    this.commitSeqNumber = commitSeqNumber;
+    this.recordKey = recordKey;
+    this.partitionPath = partitionPath;
+    this.fileName = fileName;
+    this.row = row;
+  }
+
+  @Override
+  public int numFields() {
+    return row.numFields();
+  }
+
+  @Override
+  public void setNullAt(int i) {
+    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+      switch (i) {
+        case 0: {
+          this.commitTime = null;
+          break;
+        }
+        case 1: {
+          this.commitSeqNumber = null;
+          break;
+        }
+        case 2: {
+          this.recordKey = null;
+          break;
+        }
+        case 3: {
+          this.partitionPath = null;
+          break;
+        }
+        case 4: {
+          this.fileName = null;
+          break;
+        }
+        default: throw new IllegalArgumentException("Not expected");
+      }
+    } else {
+      row.setNullAt(i);
+    }
+  }
+
+  @Override
+  public void update(int i, Object value) {
+    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+      switch (i) {
+        case 0: {
+          this.commitTime = value.toString();
+          break;
+        }
+        case 1: {
+          this.commitSeqNumber = value.toString();
+          break;
+        }
+        case 2: {
+          this.recordKey = value.toString();
+          break;
+        }
+        case 3: {
+          this.partitionPath = value.toString();
+          break;
+        }
+        case 4: {
+          this.fileName = value.toString();
+          break;
+        }
+        default: throw new IllegalArgumentException("Not expected");
+      }
+    } else {
+      row.update(i, value);
+    }
+  }
+
+  private String getHoodieColumnVal(int ordinal) {

Review comment:
       rename to `getMetaColumnVal` 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -411,11 +443,11 @@ private[hudi] object HoodieSparkSqlWriter {
 
       val asyncCompactionEnabled = isAsyncCompactionEnabled(client, 
tableConfig, parameters, jsc.hadoopConfiguration())
       val compactionInstant : common.util.Option[java.lang.String] =
-      if (asyncCompactionEnabled) {
-        client.scheduleCompaction(common.util.Option.of(new 
util.HashMap[String, String](mapAsJavaMap(metaMap))))
-      } else {
-        common.util.Option.empty()
-      }
+        if (asyncCompactionEnabled) {

Review comment:
       these are the legit indentations. there is a PR open for scala style.

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
##########
@@ -55,21 +51,22 @@ public SimpleKeyGenerator(TypedProperties props, String 
partitionPathField) {
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, recordKeyField);
+    return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0));

Review comment:
       might be good to assert this out in the constructor itself

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -95,20 +95,20 @@ public boolean commit(String instantTime, 
JavaRDD<WriteStatus> writeStatuses) {
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
       Option<Map<String, String>> extraMetadata) {
-    HoodieTableMetaClient metaClient = createMetaClient(false);
-    return commit(instantTime, writeStatuses, extraMetadata, 
metaClient.getCommitActionType());
+    List<HoodieWriteStat> stats = 
writeStatuses.map(WriteStatus::getStat).collect();
+    return commitStat(instantTime, stats, extraMetadata);
   }
 
-  private boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses,
-      Option<Map<String, String>> extraMetadata, String actionType) {
-
+  // fixme(bulkinsertv2) this name is ughh
+  public boolean commitStat(String instantTime, List<HoodieWriteStat> stats, 
Option<Map<String, String>> extraMetadata) {

Review comment:
       Looks like we cannot avoid a new public API. so might as well rename 

##########
File path: 
hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
##########
@@ -34,13 +36,28 @@ import org.scalatest.Assertions.fail
 class TestDataSourceDefaults {
 
   val schema = SchemaTestUtil.getComplexEvolvedSchema
+  val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)

Review comment:
       @nsivabalan can you please file a JIRA for this?

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
##########
@@ -177,4 +191,26 @@ private long convertLongTimeToMillis(Long partitionVal) {
     }
     return MILLISECONDS.convert(partitionVal, timeUnit);
   }
+
+  @Override
+  public String getRecordKey(Row row) {
+    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, 
getRecordKeyFields(), getRecordKeyPositions(), false);
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    Object fieldVal = null;
+    Object partitionPathFieldVal =  
RowKeyGeneratorHelper.getNestedFieldVal(row, 
getPartitionPathPositions().get(getPartitionPathFields().get(0)));

Review comment:
       is the .get(0) really fine?

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -119,21 +119,19 @@ class DefaultSource extends RelationProvider
                               optParams: Map[String, String],
                               df: DataFrame): BaseRelation = {
     val parameters = 
HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
-
     if (parameters(OPERATION_OPT_KEY).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
       HoodieSparkSqlWriter.bootstrap(sqlContext, mode, parameters, df)
     } else {
       HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
     }
-
     new HoodieEmptyRelation(sqlContext, df.schema)
   }
 
   override def createSink(sqlContext: SQLContext,
                           optParams: Map[String, String],
                           partitionColumns: Seq[String],
                           outputMode: OutputMode): Sink = {
-    val parameters = 
HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
+    val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)

Review comment:
       this needs to be reconciled with changes in the original method. oh my. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -670,7 +670,9 @@ public Builder withPath(String basePath) {
     }
 
     public Builder withSchema(String schemaStr) {
-      props.setProperty(AVRO_SCHEMA, schemaStr);
+      if (null != schemaStr) {

Review comment:
       @bvaradar  @nsivabalan why would this be null

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
##########
@@ -129,45 +134,54 @@ public String getPartitionPath(GenericRecord record) {
     if (partitionVal == null) {
       partitionVal = 1L;
     }
+    try {
+      return getPartitionPath(partitionVal);
+    } catch (Exception e) {
+      throw new HoodieDeltaStreamerException("Unable to parse input partition 
field :" + partitionVal, e);
+    }
+  }
 
+  /**
+   * Parse and fetch partition path based on data type.
+   *
+   * @param partitionVal partition path object value fetched from record/row
+   * @return the parsed partition path based on data type
+   * @throws ParseException on any parse exception
+   */
+  private String getPartitionPath(Object partitionVal) throws ParseException {

Review comment:
       need to look at this line-by-line again and see if its all good. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to