[ 
https://issues.apache.org/jira/browse/HUDI-7388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandon Dahler updated HUDI-7388:
---------------------------------
    Description: 
h2. Problem

When attempting to ingest record with an Avro target schema which includes a 
field that uses the 
[decimal|https://avro.apache.org/docs/1.11.0/spec.html#Decimal] logical type in 
Hudi 0.14.1, an exception is thrown:
{code:java}
24/02/06 21:30:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: org.apache.avro.AvroRuntimeException: cannot 
support rewrite value for schema type: 
{"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
 since the old schema type is: 
{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
        at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamerUtils.lambda$null$a903797$1(HoodieStreamerUtils.java:92)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.avro.AvroRuntimeException: cannot support rewrite value 
for schema type: 
{"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
 since the old schema type is: 
{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1088)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1006)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:951)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:847)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1259)
        at 
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
        at 
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
        at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
        ... 21 more {code}
h2. Reproduction Steps

1. Setup clean spark install
{code:java}
mkdir /tmp/hudi-decimal-repro
cd /tmp/hudi-decimal-repro
tar -xvzf ~/spark-3.4.2-bin-hadoop3.tgz{code}
2. Create a minimal schema file based on [the demo 
schema|https://github.com/apache/hudi/blob/release-0.14.1/docker/demo/config/schema.avsc].
  The only change is the {{type}} of the field named {{{}low{}}}.
{code:java}
echo '{
  "type":"record",
  "name":"stock_ticks",
  "fields":[{
     "name": "volume",
     "type": "long"
  }, {
     "name": "ts",
     "type": "string"
  }, {
     "name": "symbol",
     "type": "string"
  },{
     "name": "year",
     "type": "int"
  },{
     "name": "month",
     "type": "string"
  },{
     "name": "high",
     "type": "double"
  },{
     "name": "low",
     "type": {
       "type": "bytes",
       "logicalType": "decimal",
       "precision": 4,
       "scale": 2
     }
  },{
     "name": "key",
     "type": "string"
  },{
     "name": "date",
     "type":"string"
  }, {
     "name": "close",
     "type": "double"
  }, {
     "name": "open",
     "type": "double"
  }, {
     "name": "day",
     "type":"string"
  }
]}' > schema.avsc{code}
3. Create a minimal properties file
{code:java}
echo "hoodie.datasource.write.recordkey.field=key
hoodie.datasource.write.partitionpath.field=date
hoodie.table.recordkey.fields=key
hoodie.table.partition.fields=date
hoodie.streamer.schemaprovider.source.schema.file=/tmp/hudi-decimal-repro/schema.avsc
hoodie.streamer.schemaprovider.target.schema.file=/tmp/hudi-decimal-repro/schema.avsc
hoodie.streamer.source.dfs.root=/tmp/hudi-decimal-repro/data" > 
decimal-repro.properties{code}
4. Copy data file from the docker demo
{code:java}
mkdir data
cd data
wget 
https://raw.githubusercontent.com/apache/hudi/release-0.14.1/docker/demo/data/batch_1.json
 
cd ..{code}
5. Run HoodieStreamer
{code:java}
spark-3.4.2-bin-hadoop3/bin/spark-submit \
   --packages 
org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.15.0,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.15.0
 \
   --conf spark.kryoserializer.buffer.max=200m \
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
   --class org.apache.hudi.utilities.streamer.HoodieStreamer \
   spark-3.4.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.4.2.jar \
   --table-type COPY_ON_WRITE \
   --source-class org.apache.hudi.utilities.sources.JsonDFSSource \
   --target-base-path /tmp/hudi-decimal-repro/table \
   --target-table table \
   --props /tmp/hudi-decimal-repro/decimal-repro.properties \
   --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider {code}
h3. Expected Results

Command runs successfully, data is ingested successfully into 
{{{}/tmp/hudi-decimal-repro/table{}}}, some files exist under 
{{{}/tmp/hudi-decimal-repro/table{}}}/{{{}2018/08/31/{}}}.
h3. Actual Results

Command fails with exception, no data is ingsted into the table.  Table left 
with a hanging commit at the requested state.

Logs of the attempted run are attached as spark.log
h2. Additional Information

This issue does not appear to exist in versions 0.12.2 through 0.14.0 based on 
my own testing.  It does affect all of 0.14.1, 0.15.0, 1.0.0-beta1, 
1.0.0-beta2, and 1.0.0 releases.

  was:
h2. Problem

When attempting to ingest record with an Avro target schema which includes a 
field that uses the 
[decimal|https://avro.apache.org/docs/1.11.0/spec.html#Decimal] logical type in 
Hudi 0.14.1, an exception is thrown:
{code:java}
24/02/06 21:30:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: org.apache.avro.AvroRuntimeException: cannot 
support rewrite value for schema type: 
{"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
 since the old schema type is: 
{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
        at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamerUtils.lambda$null$a903797$1(HoodieStreamerUtils.java:92)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.avro.AvroRuntimeException: cannot support rewrite value 
for schema type: 
{"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
 since the old schema type is: 
{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1088)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1006)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:951)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:847)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1259)
        at 
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
        at 
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
        at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
        ... 21 more {code}
h2. Reproduction Steps

1. Setup clean spark install
{code:java}
mkdir /tmp/hudi-decimal-repro
cd /tmp/hudi-decimal-repro
tar -xvzf ~/spark-3.4.2-bin-hadoop3.tgz{code}
2. Create a minimal schema file based on [the demo 
schema|https://github.com/apache/hudi/blob/release-0.14.1/docker/demo/config/schema.avsc].
  The only change is the {{type}} of the field named {{{}low{}}}.
{code:java}
echo '{
  "type":"record",
  "name":"stock_ticks",
  "fields":[{
     "name": "volume",
     "type": "long"
  }, {
     "name": "ts",
     "type": "string"
  }, {
     "name": "symbol",
     "type": "string"
  },{
     "name": "year",
     "type": "int"
  },{
     "name": "month",
     "type": "string"
  },{
     "name": "high",
     "type": "double"
  },{
     "name": "low",
     "type": {
       "type": "bytes",
       "logicalType": "decimal",
       "precision": 4,
       "scale": 2
     }
  },{
     "name": "key",
     "type": "string"
  },{
     "name": "date",
     "type":"string"
  }, {
     "name": "close",
     "type": "double"
  }, {
     "name": "open",
     "type": "double"
  }, {
     "name": "day",
     "type":"string"
  }
]}' > schema.avsc{code}
3. Create a minimal properties file
{code:java}
echo "hoodie.datasource.write.recordkey.field=key
hoodie.datasource.write.partitionpath.field=date
hoodie.table.recordkey.fields=key
hoodie.table.partition.fields=date
hoodie.streamer.schemaprovider.source.schema.file=/tmp/hudi-decimal-repro/schema.avsc
hoodie.streamer.schemaprovider.target.schema.file=/tmp/hudi-decimal-repro/schema.avsc
hoodie.streamer.source.dfs.root=/tmp/hudi-decimal-repro/data" > 
decimal-repro.properties{code}
4. Copy data file from the docker demo
{code:java}
mkdir data
cd data
wget 
https://raw.githubusercontent.com/apache/hudi/release-0.14.1/docker/demo/data/batch_1.json
 
cd ..{code}
5. Run HoodieStreamer
{code:java}
spark-3.4.2-bin-hadoop3/bin/spark-submit \
   --packages 
org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.15.0,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.15.0
 \
   --conf spark.kryoserializer.buffer.max=200m \
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
   --class org.apache.hudi.utilities.streamer.HoodieStreamer \
   spark-3.4.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.4.2.jar \
   --table-type COPY_ON_WRITE \
   --source-class org.apache.hudi.utilities.sources.JsonDFSSource \
   --target-base-path /tmp/hudi-decimal-repro/table \
   --target-table table \
   --props /tmp/hudi-decimal-repro/decimal-repro.properties \
   --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider {code}
h3. Expected Results

Command runs successfully, data is ingested successfully into 
{{{}/tmp/hudi-decimal-repro/table{}}}, some files exist under 
{{{}/tmp/hudi-decimal-repro/table{}}}/{{{}2018/08/31/{}}}.
h3. Actual Results

Command fails with exception, no data is ingsted into the table.  Table left 
with a hanging commit at the requested state.

Logs of the attempted run are attached as spark.log
h2. Additional Information

This issue does not appear to exist in versions 0.12.2 through 0.14.0 based on 
my own testing.  It does affect all of 0.14.1, 0.15.0, 1.0.0-beta1, and 
1.0.0-beta2 releases.


> [Regression] Records with a field of logical type decimal can no longer be 
> ingested via HoodieStreamer
> ------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-7388
>                 URL: https://issues.apache.org/jira/browse/HUDI-7388
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: deltastreamer
>    Affects Versions: 1.0.0-beta1, 0.14.1, 0.15.0, 1.0.0-beta2, 1.0.0
>            Reporter: Brandon Dahler
>            Priority: Major
>             Fix For: 0.16.0
>
>         Attachments: decimal-repro.properties, schema.avsc, spark.log
>
>
> h2. Problem
> When attempting to ingest record with an Avro target schema which includes a 
> field that uses the 
> [decimal|https://avro.apache.org/docs/1.11.0/spec.html#Decimal] logical type 
> in Hudi 0.14.1, an exception is thrown:
> {code:java}
> 24/02/06 21:30:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.lang.RuntimeException: org.apache.avro.AvroRuntimeException: cannot 
> support rewrite value for schema type: 
> {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
>  since the old schema type is: 
> {"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
>         at 
> org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
>         at 
> org.apache.hudi.utilities.streamer.HoodieStreamerUtils.lambda$null$a903797$1(HoodieStreamerUtils.java:92)
>         at 
> org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153)
>         at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
>         at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
>         at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>         at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
>         at org.apache.spark.scheduler.Task.run(Task.scala:139)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>         at java.base/java.lang.Thread.run(Thread.java:840)
> Caused by: org.apache.avro.AvroRuntimeException: cannot support rewrite value 
> for schema type: 
> {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
>  since the old schema type is: 
> {"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1088)
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1006)
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:951)
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:847)
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1259)
>         at 
> org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
>         at 
> org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
>         at 
> org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
>         ... 21 more {code}
> h2. Reproduction Steps
> 1. Setup clean spark install
> {code:java}
> mkdir /tmp/hudi-decimal-repro
> cd /tmp/hudi-decimal-repro
> tar -xvzf ~/spark-3.4.2-bin-hadoop3.tgz{code}
> 2. Create a minimal schema file based on [the demo 
> schema|https://github.com/apache/hudi/blob/release-0.14.1/docker/demo/config/schema.avsc].
>   The only change is the {{type}} of the field named {{{}low{}}}.
> {code:java}
> echo '{
>   "type":"record",
>   "name":"stock_ticks",
>   "fields":[{
>      "name": "volume",
>      "type": "long"
>   }, {
>      "name": "ts",
>      "type": "string"
>   }, {
>      "name": "symbol",
>      "type": "string"
>   },{
>      "name": "year",
>      "type": "int"
>   },{
>      "name": "month",
>      "type": "string"
>   },{
>      "name": "high",
>      "type": "double"
>   },{
>      "name": "low",
>      "type": {
>        "type": "bytes",
>        "logicalType": "decimal",
>        "precision": 4,
>        "scale": 2
>      }
>   },{
>      "name": "key",
>      "type": "string"
>   },{
>      "name": "date",
>      "type":"string"
>   }, {
>      "name": "close",
>      "type": "double"
>   }, {
>      "name": "open",
>      "type": "double"
>   }, {
>      "name": "day",
>      "type":"string"
>   }
> ]}' > schema.avsc{code}
> 3. Create a minimal properties file
> {code:java}
> echo "hoodie.datasource.write.recordkey.field=key
> hoodie.datasource.write.partitionpath.field=date
> hoodie.table.recordkey.fields=key
> hoodie.table.partition.fields=date
> hoodie.streamer.schemaprovider.source.schema.file=/tmp/hudi-decimal-repro/schema.avsc
> hoodie.streamer.schemaprovider.target.schema.file=/tmp/hudi-decimal-repro/schema.avsc
> hoodie.streamer.source.dfs.root=/tmp/hudi-decimal-repro/data" > 
> decimal-repro.properties{code}
> 4. Copy data file from the docker demo
> {code:java}
> mkdir data
> cd data
> wget 
> https://raw.githubusercontent.com/apache/hudi/release-0.14.1/docker/demo/data/batch_1.json
>  
> cd ..{code}
> 5. Run HoodieStreamer
> {code:java}
> spark-3.4.2-bin-hadoop3/bin/spark-submit \
>    --packages 
> org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.15.0,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.15.0
>  \
>    --conf spark.kryoserializer.buffer.max=200m \
>    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>    --class org.apache.hudi.utilities.streamer.HoodieStreamer \
>    spark-3.4.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.4.2.jar \
>    --table-type COPY_ON_WRITE \
>    --source-class org.apache.hudi.utilities.sources.JsonDFSSource \
>    --target-base-path /tmp/hudi-decimal-repro/table \
>    --target-table table \
>    --props /tmp/hudi-decimal-repro/decimal-repro.properties \
>    --schemaprovider-class 
> org.apache.hudi.utilities.schema.FilebasedSchemaProvider {code}
> h3. Expected Results
> Command runs successfully, data is ingested successfully into 
> {{{}/tmp/hudi-decimal-repro/table{}}}, some files exist under 
> {{{}/tmp/hudi-decimal-repro/table{}}}/{{{}2018/08/31/{}}}.
> h3. Actual Results
> Command fails with exception, no data is ingsted into the table.  Table left 
> with a hanging commit at the requested state.
> Logs of the attempted run are attached as spark.log
> h2. Additional Information
> This issue does not appear to exist in versions 0.12.2 through 0.14.0 based 
> on my own testing.  It does affect all of 0.14.1, 0.15.0, 1.0.0-beta1, 
> 1.0.0-beta2, and 1.0.0 releases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to