Magicbeanbuyer commented on issue #2498:
URL: https://github.com/apache/hudi/issues/2498#issuecomment-775872798
Hey @vinothchandar,
we've came across the same issue with reading MERGE_ON_READ table using
spark. We consume data from our AWS MSK topic, write the data using
`deltastreamer` on AWS EMR, and store the data in an S3 bucket.
Following is our implementation.
### Write data
```
spark-submit \
--jars /usr/lib/hudi/hudi-utilities-bundle_2.12-0.7.0.jar \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"\
--conf "spark.sql.hive.convertMetastoreParquet=false" \
/usr/lib/hudi/hudi-utilities-bundle_2.12-0.7.0.jar \
--spark-master yarn \
--schemaprovider-class
org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--table-type MERGE_ON_READ \
--source-ordering-field id \
--target-base-path $target_base_path \
--target-table $target_table \
--hoodie-conf
"hoodie.deltastreamer.schemaprovider.source.schema.file=$schema_file_path" \
--hoodie-conf
"hoodie.deltastreamer.schemaprovider.target.schema.file=$schema_file_path" \
--hoodie-conf
"hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator"
\
--hoodie-conf "hoodie.datasource.write.recordkey.field=id" \
--hoodie-conf
"hoodie.datasource.write.partitionpath.field=partitiontime:TIMESTAMP" \
--hoodie-conf
"hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd'T'HH:mm:ss.SSSZ"
\
--hoodie-conf "hoodie.datasource.write.hive_style_partitioning=true" \
--hoodie-conf
"hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy" \
--hoodie-conf
"hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING" \
--hoodie-conf
"hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit=milliseconds"
\
--hoodie-conf "hoodie.deltastreamer.keygen.timebased.timezone=UTC" \
--hoodie-conf "hoodie.deltastreamer.source.kafka.topic=$kafka_topic" \
--hoodie-conf "bootstrap.servers=$kafka_bootstrap_servers" \
--hoodie-conf "auto.offset.reset=earliest"
```
The hoodie table is generated in our S3 bucket no problem. However, Error
message was thrown when we try to read it using either `python` or `scala`.
### Read Data
#### Scala
```
spark-shell \
--packages
org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1
\
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf "spark.sql.hive.convertMetastoreParquet=false"
```
Trying to load data
```
val basePath="s3://path/to/base/table"
val df = spark.read.format("hudi").load(basePath + "/*/*/*/*")
```
Error message
```
java.lang.NoSuchMethodError:
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(Lorg/apache/spark/sql/SparkSession;Lscala/collection/Seq;Lscala/collection/immutable/Map;Lscala/Option;Lorg/apache/spark/sql/execution/datasources/FileStatusCache;)V
at
org.apache.hudi.HoodieSparkUtils$.createInMemoryFileIndex(HoodieSparkUtils.scala:89)
at
org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:127)
at
org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:72)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
... 47 elided
```
#### Python
```
pyspark \
--packages
org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1
\
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf "spark.sql.hive.convertMetastoreParquet=false"
```
Trying to load data
```
basePath="s3://path/to/base/table"
df = spark.read.format("hudi").load(basePath + "/*/*/*/*")
```
Error message
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 178, in load
return self._df(self._jreader.load(path))
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1305, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 128, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o79.load.
: java.lang.NoSuchMethodError:
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(Lorg/apache/spark/sql/SparkSession;Lscala/collection/Seq;Lscala/collection/immutable/Map;Lscala/Option;Lorg/apache/spark/sql/execution/datasources/FileStatusCache;)V
at
org.apache.hudi.HoodieSparkUtils$.createInMemoryFileIndex(HoodieSparkUtils.scala:89)
at
org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:127)
at
org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:72)
at
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89)
at
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]