nsivabalan commented on a change in pull request #2616:
URL: https://github.com/apache/hudi/pull/2616#discussion_r589855348



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/RangePartitionAvroKeyGenerator.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.generic.GenericRecord;
+
+public class RangePartitionAvroKeyGenerator extends SimpleAvroKeyGenerator {
+
+  private final Long rangePerBucket;
+  private final String partitionName;
+
+  public static class Config {
+    public static final String RANGE_PER_PARTITION_PROP = 
"hoodie.keygen.range.partition.num";
+    public static final Long DEFAULT_RANGE_PER_PARTITION = 100000L;
+    public static final String RANGE_PARTITION_NAME_PROP = 
"hoodie.keygen.range.partition.name";
+    public static final String DEFAULT_RANGE_PARTITION_NAME = "rangePartition";
+  }
+
+  public RangePartitionAvroKeyGenerator(TypedProperties config) {
+    this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
+        config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
+  }
+
+  RangePartitionAvroKeyGenerator(TypedProperties config, String 
partitionPathField) {
+    this(config, null, partitionPathField);
+  }
+
+  RangePartitionAvroKeyGenerator(TypedProperties config, String 
recordKeyField, String partitionPathField) {
+    super(config, recordKeyField, partitionPathField);
+    this.rangePerBucket = config.getLong(Config.RANGE_PER_PARTITION_PROP, 
Config.DEFAULT_RANGE_PER_PARTITION);
+    this.partitionName = config.getString(Config.RANGE_PARTITION_NAME_PROP, 
Config.DEFAULT_RANGE_PARTITION_NAME);
+  }
+
+  @Override
+  public String getPartitionPath(GenericRecord record) {
+    Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, 
getPartitionPathFields().get(0), true);
+    String partitionPath;
+    if (partitionVal == null) {
+      partitionPath = KeyGenUtils.DEFAULT_PARTITION_PATH;
+    } else {
+      partitionPath = getPartitionPath(partitionVal);
+    }
+    return partitionPath;
+  }
+
+  public Object getDefaultPartitionVal() {
+    return 1L;
+  }
+
+  public String getPartitionPath(Object partitionVal) {
+    String bucketVal;
+    if (partitionVal instanceof Long) {
+      bucketVal = String.valueOf((Long) partitionVal / rangePerBucket);
+    } else if (partitionVal instanceof Integer) {
+      bucketVal = String.valueOf((Integer) partitionVal / rangePerBucket);
+    } else if (partitionVal instanceof Double) {
+      bucketVal = String.valueOf(Math.round((Double) partitionVal / 
rangePerBucket));
+    } else if (partitionVal instanceof Float) {
+      bucketVal = String.valueOf(Math.round((Float) partitionVal / 
rangePerBucket));
+    } else if (partitionVal instanceof String) {
+      Long partitionValInLong;
+      try {
+        partitionValInLong = Long.parseLong((String) partitionVal);
+      } catch (Exception e) {
+        throw new HoodieKeyGeneratorException(
+            "Failed to parse partition field: " + partitionVal + " from String 
to Long");
+      }
+      bucketVal = String.valueOf(partitionValInLong / rangePerBucket);
+    } else {
+      throw new HoodieNotSupportedException(
+          "Unexpected type for partition field: " + 
partitionVal.getClass().getName());

Review comment:
       can we embed in the exception that this partition type is not supported 
for RangePartitioning sort of. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/RangePartitionAvroKeyGenerator.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.generic.GenericRecord;
+

Review comment:
       java docs w/ an example would be nice. 

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java
##########
@@ -65,4 +65,15 @@ public static Row getRow(GenericRecord record, Schema 
schema, StructType structT
     }
     return new GenericRowWithSchema(values, structType);
   }
+
+  public static Row genericRecordToRow(GenericRecord baseRecord, Schema 
schema, StructType structType) {

Review comment:
       can we add java docs for public methods. also, check if package private 
would suffice.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RangePartitionKeyGenerator.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
+
+public class RangePartitionKeyGenerator extends SimpleKeyGenerator {

Review comment:
       whats expectation for negative value? 

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestRangePartitionKeyGenerator.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestRangePartitionKeyGenerator extends KeyGeneratorTestUtilities {
+
+  private GenericRecord baseRecord;
+  private TypedProperties properties = new TypedProperties();
+
+  private Schema schema;
+  private StructType structType;
+  private Row baseRow;
+
+  @BeforeEach
+  public void initialize() throws IOException {
+    schema = SchemaTestUtil.getComplexEvolvedSchema();
+    structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
+    baseRecord = SchemaTestUtil
+        .generateAvroRecordFromJson(schema, 1, "001", "f1");
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+  }
+
+  private TypedProperties getCommonProps(String partitionField) {
+    TypedProperties properties = new TypedProperties();
+    properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "field1");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, 
partitionField);
+    properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, 
"true");
+    
properties.put(RangePartitionAvroKeyGenerator.Config.RANGE_PER_PARTITION_PROP, 
"10000");
+    
properties.put(RangePartitionAvroKeyGenerator.Config.RANGE_PARTITION_NAME_PROP, 
"bucket");
+    return properties;
+  }
+
+  @Test
+  public void testRangePartitionKeyGenerator() {
+    // test long
+    baseRecord.put("favoriteNumber", 100000L);

Review comment:
       can we do few more values in a loop.
   ```
   for(long val: {0,1, 20, 150, 10001, 11034, .... etc} {
       // you can get expected value by dividing the value by range per 
partition.
       test code.
   }
   ```

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RangePartitionKeyGenerator.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
+
+public class RangePartitionKeyGenerator extends SimpleKeyGenerator {

Review comment:
       java docs with example would be nice.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RangePartitionKeyGenerator.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
+
+public class RangePartitionKeyGenerator extends SimpleKeyGenerator {
+
+  private final RangePartitionAvroKeyGenerator rangePartitionAvroKeyGenerator;
+
+  public RangePartitionKeyGenerator(TypedProperties config) {
+    this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
+        config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
+  }
+
+  RangePartitionKeyGenerator(TypedProperties config, String 
partitionPathField) {
+    this(config, null, partitionPathField);
+  }
+
+  RangePartitionKeyGenerator(TypedProperties config, String recordKeyField, 
String partitionPathField) {
+    super(config, recordKeyField, partitionPathField);
+    this.rangePartitionAvroKeyGenerator = new 
RangePartitionAvroKeyGenerator(config, recordKeyField, partitionPathField);
+  }
+
+  @Override
+  public String getPartitionPath(GenericRecord record) {
+    return rangePartitionAvroKeyGenerator.getPartitionPath(record);
+  }
+
+  @Override
+  public String getRecordKey(Row row) {
+    buildFieldPositionMapIfNeeded(row.schema());
+    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, 
getRecordKeyFields(), recordKeyPositions, false);
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    Object fieldVal = null;
+    buildFieldPositionMapIfNeeded(row.schema());
+    Object partitionPathFieldVal = 
RowKeyGeneratorHelper.getNestedFieldVal(row, 
partitionPathPositions.get(getPartitionPathFields().get(0)));
+    try {
+      if (partitionPathFieldVal == null || 
partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || 
partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
+          || 
partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+        fieldVal = rangePartitionAvroKeyGenerator.getDefaultPartitionVal();

Review comment:
       might as well return here and have another return in else block. 

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestRangePartitionKeyGenerator.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestRangePartitionKeyGenerator extends KeyGeneratorTestUtilities {
+
+  private GenericRecord baseRecord;
+  private TypedProperties properties = new TypedProperties();
+
+  private Schema schema;
+  private StructType structType;
+  private Row baseRow;
+
+  @BeforeEach
+  public void initialize() throws IOException {
+    schema = SchemaTestUtil.getComplexEvolvedSchema();
+    structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
+    baseRecord = SchemaTestUtil
+        .generateAvroRecordFromJson(schema, 1, "001", "f1");
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+  }
+
+  private TypedProperties getCommonProps(String partitionField) {
+    TypedProperties properties = new TypedProperties();
+    properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "field1");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, 
partitionField);
+    properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, 
"true");
+    
properties.put(RangePartitionAvroKeyGenerator.Config.RANGE_PER_PARTITION_PROP, 
"10000");
+    
properties.put(RangePartitionAvroKeyGenerator.Config.RANGE_PARTITION_NAME_PROP, 
"bucket");
+    return properties;
+  }
+
+  @Test
+  public void testRangePartitionKeyGenerator() {
+    // test long
+    baseRecord.put("favoriteNumber", 100000L);
+    properties = getCommonProps("favoriteNumber");
+    RangePartitionKeyGenerator keyGen = new 
RangePartitionKeyGenerator(properties);
+    HoodieKey hk1 = keyGen.getKey(baseRecord);
+    assertEquals("bucket=10", hk1.getPartitionPath());
+    // test Row
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+    assertEquals("bucket=10", keyGen.getPartitionPath(baseRow));
+
+    // test int
+    baseRecord.put("favoriteIntNumber", 110000);

Review comment:
       similar comment as above. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/RangePartitionAvroKeyGenerator.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.generic.GenericRecord;
+
+public class RangePartitionAvroKeyGenerator extends SimpleAvroKeyGenerator {
+
+  private final Long rangePerBucket;
+  private final String partitionName;
+
+  public static class Config {
+    public static final String RANGE_PER_PARTITION_PROP = 
"hoodie.keygen.range.partition.num";
+    public static final Long DEFAULT_RANGE_PER_PARTITION = 100000L;
+    public static final String RANGE_PARTITION_NAME_PROP = 
"hoodie.keygen.range.partition.name";
+    public static final String DEFAULT_RANGE_PARTITION_NAME = "rangePartition";
+  }
+
+  public RangePartitionAvroKeyGenerator(TypedProperties config) {
+    this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
+        config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
+  }
+
+  RangePartitionAvroKeyGenerator(TypedProperties config, String 
partitionPathField) {
+    this(config, null, partitionPathField);
+  }
+
+  RangePartitionAvroKeyGenerator(TypedProperties config, String 
recordKeyField, String partitionPathField) {
+    super(config, recordKeyField, partitionPathField);
+    this.rangePerBucket = config.getLong(Config.RANGE_PER_PARTITION_PROP, 
Config.DEFAULT_RANGE_PER_PARTITION);
+    this.partitionName = config.getString(Config.RANGE_PARTITION_NAME_PROP, 
Config.DEFAULT_RANGE_PARTITION_NAME);
+  }
+
+  @Override
+  public String getPartitionPath(GenericRecord record) {
+    Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, 
getPartitionPathFields().get(0), true);
+    String partitionPath;
+    if (partitionVal == null) {
+      partitionPath = KeyGenUtils.DEFAULT_PARTITION_PATH;
+    } else {
+      partitionPath = getPartitionPath(partitionVal);
+    }
+    return partitionPath;
+  }
+
+  public Object getDefaultPartitionVal() {
+    return 1L;
+  }
+
+  public String getPartitionPath(Object partitionVal) {
+    String bucketVal;
+    if (partitionVal instanceof Long) {
+      bucketVal = String.valueOf((Long) partitionVal / rangePerBucket);
+    } else if (partitionVal instanceof Integer) {
+      bucketVal = String.valueOf((Integer) partitionVal / rangePerBucket);
+    } else if (partitionVal instanceof Double) {
+      bucketVal = String.valueOf(Math.round((Double) partitionVal / 
rangePerBucket));
+    } else if (partitionVal instanceof Float) {
+      bucketVal = String.valueOf(Math.round((Float) partitionVal / 
rangePerBucket));
+    } else if (partitionVal instanceof String) {
+      Long partitionValInLong;
+      try {
+        partitionValInLong = Long.parseLong((String) partitionVal);
+      } catch (Exception e) {
+        throw new HoodieKeyGeneratorException(
+            "Failed to parse partition field: " + partitionVal + " from String 
to Long");
+      }
+      bucketVal = String.valueOf(partitionValInLong / rangePerBucket);

Review comment:
       guess we can move this just after line 85. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/RangePartitionAvroKeyGenerator.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.generic.GenericRecord;
+
+public class RangePartitionAvroKeyGenerator extends SimpleAvroKeyGenerator {
+
+  private final Long rangePerBucket;
+  private final String partitionName;
+
+  public static class Config {
+    public static final String RANGE_PER_PARTITION_PROP = 
"hoodie.keygen.range.partition.num";
+    public static final Long DEFAULT_RANGE_PER_PARTITION = 100000L;
+    public static final String RANGE_PARTITION_NAME_PROP = 
"hoodie.keygen.range.partition.name";
+    public static final String DEFAULT_RANGE_PARTITION_NAME = "rangePartition";
+  }
+
+  public RangePartitionAvroKeyGenerator(TypedProperties config) {
+    this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
+        config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
+  }
+
+  RangePartitionAvroKeyGenerator(TypedProperties config, String 
partitionPathField) {
+    this(config, null, partitionPathField);
+  }
+
+  RangePartitionAvroKeyGenerator(TypedProperties config, String 
recordKeyField, String partitionPathField) {
+    super(config, recordKeyField, partitionPathField);
+    this.rangePerBucket = config.getLong(Config.RANGE_PER_PARTITION_PROP, 
Config.DEFAULT_RANGE_PER_PARTITION);
+    this.partitionName = config.getString(Config.RANGE_PARTITION_NAME_PROP, 
Config.DEFAULT_RANGE_PARTITION_NAME);
+  }
+
+  @Override
+  public String getPartitionPath(GenericRecord record) {
+    Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, 
getPartitionPathFields().get(0), true);
+    String partitionPath;
+    if (partitionVal == null) {
+      partitionPath = KeyGenUtils.DEFAULT_PARTITION_PATH;
+    } else {
+      partitionPath = getPartitionPath(partitionVal);
+    }
+    return partitionPath;
+  }
+
+  public Object getDefaultPartitionVal() {

Review comment:
       do these methods need to be public? package private may not suffice is 
it? 

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestRangePartitionKeyGenerator.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestRangePartitionKeyGenerator extends KeyGeneratorTestUtilities {
+
+  private GenericRecord baseRecord;
+  private TypedProperties properties = new TypedProperties();
+
+  private Schema schema;
+  private StructType structType;
+  private Row baseRow;
+
+  @BeforeEach
+  public void initialize() throws IOException {
+    schema = SchemaTestUtil.getComplexEvolvedSchema();
+    structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
+    baseRecord = SchemaTestUtil
+        .generateAvroRecordFromJson(schema, 1, "001", "f1");
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+  }
+
+  private TypedProperties getCommonProps(String partitionField) {
+    TypedProperties properties = new TypedProperties();
+    properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "field1");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, 
partitionField);
+    properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, 
"true");
+    
properties.put(RangePartitionAvroKeyGenerator.Config.RANGE_PER_PARTITION_PROP, 
"10000");
+    
properties.put(RangePartitionAvroKeyGenerator.Config.RANGE_PARTITION_NAME_PROP, 
"bucket");
+    return properties;
+  }
+
+  @Test
+  public void testRangePartitionKeyGenerator() {
+    // test long
+    baseRecord.put("favoriteNumber", 100000L);
+    properties = getCommonProps("favoriteNumber");
+    RangePartitionKeyGenerator keyGen = new 
RangePartitionKeyGenerator(properties);
+    HoodieKey hk1 = keyGen.getKey(baseRecord);
+    assertEquals("bucket=10", hk1.getPartitionPath());
+    // test Row
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+    assertEquals("bucket=10", keyGen.getPartitionPath(baseRow));
+
+    // test int
+    baseRecord.put("favoriteIntNumber", 110000);
+    properties = getCommonProps("favoriteIntNumber");
+    keyGen = new RangePartitionKeyGenerator(properties);
+    HoodieKey hk2 = keyGen.getKey(baseRecord);
+    assertEquals("bucket=11", hk2.getPartitionPath());
+    // test Row
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+    assertEquals("bucket=11", keyGen.getPartitionPath(baseRow));
+
+    // test double
+    baseRecord.put("favoriteDoubleNumber", 120000.1d);

Review comment:
       same as above

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestRangePartitionKeyGenerator.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestRangePartitionKeyGenerator extends KeyGeneratorTestUtilities {
+
+  private GenericRecord baseRecord;
+  private TypedProperties properties = new TypedProperties();
+
+  private Schema schema;
+  private StructType structType;
+  private Row baseRow;
+
+  @BeforeEach
+  public void initialize() throws IOException {
+    schema = SchemaTestUtil.getComplexEvolvedSchema();
+    structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
+    baseRecord = SchemaTestUtil
+        .generateAvroRecordFromJson(schema, 1, "001", "f1");
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+  }
+
+  private TypedProperties getCommonProps(String partitionField) {
+    TypedProperties properties = new TypedProperties();
+    properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "field1");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, 
partitionField);
+    properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, 
"true");
+    
properties.put(RangePartitionAvroKeyGenerator.Config.RANGE_PER_PARTITION_PROP, 
"10000");
+    
properties.put(RangePartitionAvroKeyGenerator.Config.RANGE_PARTITION_NAME_PROP, 
"bucket");
+    return properties;
+  }
+
+  @Test
+  public void testRangePartitionKeyGenerator() {
+    // test long
+    baseRecord.put("favoriteNumber", 100000L);
+    properties = getCommonProps("favoriteNumber");
+    RangePartitionKeyGenerator keyGen = new 
RangePartitionKeyGenerator(properties);
+    HoodieKey hk1 = keyGen.getKey(baseRecord);
+    assertEquals("bucket=10", hk1.getPartitionPath());
+    // test Row
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+    assertEquals("bucket=10", keyGen.getPartitionPath(baseRow));
+
+    // test int
+    baseRecord.put("favoriteIntNumber", 110000);
+    properties = getCommonProps("favoriteIntNumber");
+    keyGen = new RangePartitionKeyGenerator(properties);
+    HoodieKey hk2 = keyGen.getKey(baseRecord);
+    assertEquals("bucket=11", hk2.getPartitionPath());
+    // test Row
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+    assertEquals("bucket=11", keyGen.getPartitionPath(baseRow));
+
+    // test double
+    baseRecord.put("favoriteDoubleNumber", 120000.1d);
+    properties = getCommonProps("favoriteDoubleNumber");
+    keyGen = new RangePartitionKeyGenerator(properties);
+    HoodieKey hk3 = keyGen.getKey(baseRecord);
+    assertEquals("bucket=12", hk3.getPartitionPath());
+    // test Row
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+    assertEquals("bucket=12", keyGen.getPartitionPath(baseRow));
+
+    // test float
+    baseRecord.put("favoriteFloatNumber", 130000.123f);

Review comment:
       same as above

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestRangePartitionKeyGenerator.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestRangePartitionKeyGenerator extends KeyGeneratorTestUtilities {
+
+  private GenericRecord baseRecord;
+  private TypedProperties properties = new TypedProperties();
+
+  private Schema schema;
+  private StructType structType;
+  private Row baseRow;
+
+  @BeforeEach
+  public void initialize() throws IOException {
+    schema = SchemaTestUtil.getComplexEvolvedSchema();
+    structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
+    baseRecord = SchemaTestUtil
+        .generateAvroRecordFromJson(schema, 1, "001", "f1");
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+  }
+
+  private TypedProperties getCommonProps(String partitionField) {
+    TypedProperties properties = new TypedProperties();
+    properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "field1");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, 
partitionField);
+    properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, 
"true");
+    
properties.put(RangePartitionAvroKeyGenerator.Config.RANGE_PER_PARTITION_PROP, 
"10000");
+    
properties.put(RangePartitionAvroKeyGenerator.Config.RANGE_PARTITION_NAME_PROP, 
"bucket");
+    return properties;
+  }
+
+  @Test
+  public void testRangePartitionKeyGenerator() {
+    // test long
+    baseRecord.put("favoriteNumber", 100000L);
+    properties = getCommonProps("favoriteNumber");
+    RangePartitionKeyGenerator keyGen = new 
RangePartitionKeyGenerator(properties);
+    HoodieKey hk1 = keyGen.getKey(baseRecord);
+    assertEquals("bucket=10", hk1.getPartitionPath());
+    // test Row
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+    assertEquals("bucket=10", keyGen.getPartitionPath(baseRow));
+
+    // test int
+    baseRecord.put("favoriteIntNumber", 110000);
+    properties = getCommonProps("favoriteIntNumber");
+    keyGen = new RangePartitionKeyGenerator(properties);
+    HoodieKey hk2 = keyGen.getKey(baseRecord);
+    assertEquals("bucket=11", hk2.getPartitionPath());
+    // test Row
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+    assertEquals("bucket=11", keyGen.getPartitionPath(baseRow));
+
+    // test double
+    baseRecord.put("favoriteDoubleNumber", 120000.1d);
+    properties = getCommonProps("favoriteDoubleNumber");
+    keyGen = new RangePartitionKeyGenerator(properties);
+    HoodieKey hk3 = keyGen.getKey(baseRecord);
+    assertEquals("bucket=12", hk3.getPartitionPath());
+    // test Row
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+    assertEquals("bucket=12", keyGen.getPartitionPath(baseRow));
+
+    // test float
+    baseRecord.put("favoriteFloatNumber", 130000.123f);
+    properties = getCommonProps("favoriteFloatNumber");
+    keyGen = new RangePartitionKeyGenerator(properties);
+    HoodieKey hk4 = keyGen.getKey(baseRecord);
+    assertEquals("bucket=13", hk4.getPartitionPath());
+    // test Row
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+    assertEquals("bucket=13", keyGen.getPartitionPath(baseRow));
+
+    // test string
+    baseRecord.put("field2", "140000");
+    properties = getCommonProps("field2");
+    keyGen = new RangePartitionKeyGenerator(properties);
+    HoodieKey hk5 = keyGen.getKey(baseRecord);
+    assertEquals("bucket=14", hk5.getPartitionPath());
+    // test Row
+    baseRow = genericRecordToRow(baseRecord, schema, structType);
+    assertEquals("bucket=14", keyGen.getPartitionPath(baseRow));

Review comment:
       can you add a test for exception case. something like "123abc". 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to