jtmzheng opened a new issue #2566:
URL: https://github.com/apache/hudi/issues/2566


   **_Tips before filing an issue_**
   
   I'm trying out Hudi 0.7 locally via PySpark and oddly I can write data that 
can be read back as parquet but not as Hudi. It seems `InMemoryFileIndex` is 
missing?
   
   Stacktrace for pytest test:
   ```
   ============================================================== FAILURES 
==============================================================
   _____________________________________________________________ test_hudi 
______________________________________________________________
   answer = 'xro66', gateway_client = <py4j.java_gateway.GatewayClient object 
at 0x1069a83d0>, target_id = 'o65', name = 'load'
   
       def get_return_value(answer, gateway_client, target_id=None, name=None):
           """Converts an answer received from the Java gateway into a Python 
object.
   
           For example, string representation of integers are converted to 
Python
           integer, string representation of objects are converted to JavaObject
           instances, etc.
   
           :param answer: the string returned by the Java gateway
           :param gateway_client: the gateway client used to communicate with 
the Java
               Gateway. Only necessary if the answer is a reference (e.g., 
object,
               list, map)
           :param target_id: the name of the object from which the answer comes 
from
               (e.g., *object1* in `object1.hello()`). Optional.
           :param name: the name of the member from which the answer comes from
               (e.g., *hello* in `object1.hello()`). Optional.
           """
           if is_error(answer)[0]:
               if len(answer) > 1:
                   type = answer[1]
                   value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                   if answer[1] == REFERENCE_TYPE:
                       raise Py4JJavaError(
                           "An error occurred while calling {0}{1}{2}.\n".
   >                       format(target_id, ".", name), value)
   E                   py4j.protocol.Py4JJavaError: An error occurred while 
calling o65.load.
   E                   : java.lang.NoSuchMethodError: 'void 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(org.apache.spark.sql.SparkSession,
 scala.collection.Seq, scala.collection.immutable.Map, scala.Option, 
org.apache.spark.sql.execution.datasources.FileStatusCache)'
   E                       at 
org.apache.hudi.HoodieSparkUtils$.createInMemoryFileIndex(HoodieSparkUtils.scala:89)
   E                       at 
org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:127)
   E                       at 
org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:72)
   E                       at 
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89)
   E                       at 
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53)
   E                       at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:339)
   E                       at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
   E                       at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
   E                       at scala.Option.getOrElse(Option.scala:189)
   E                       at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
   E                       at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:214)
   E                       at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   E                       at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   E                       at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   E                       at 
java.base/java.lang.reflect.Method.invoke(Method.java:567)
   E                       at 
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   E                       at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   E                       at py4j.Gateway.invoke(Gateway.java:282)
   E                       at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   E                       at 
py4j.commands.CallCommand.execute(CallCommand.java:79)
   E                       at 
py4j.GatewayConnection.run(GatewayConnection.java:238)
   E                       at java.base/java.lang.Thread.run(Thread.java:830)
   
   
../../.pyenv/versions/3.7.6/envs/spark/lib/python3.7/site-packages/py4j/protocol.py:328:
 Py4JJavaError
   ```
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Sample code using pytest:
   ```
   import pytest
   from pyspark import SparkConf
   from pyspark import SparkContext
   from pyspark.sql import SparkSession
   
   from pyspark.sql import Row
   
   
   def test_hudi(tmp_path):
       SparkContext.getOrCreate(
           conf=SparkConf()
           .setAppName("testing")
           .setMaster("local[1]")
           .set(
               "spark.jars.packages",
               
"org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1",
           )
           .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
           .set("spark.sql.hive.convertMetastoreParquet", "false")
       )
       spark = SparkSession.builder.getOrCreate()
   
       hudi_options = {
           "hoodie.table.name": "test",
           "hoodie.datasource.write.recordkey.field": "id",
           "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
           "hoodie.datasource.write.partitionpath.field": "year,month,day",
           "hoodie.datasource.write.table.name": "test",
           "hoodie.datasource.write.table.type": "MERGE_ON_READ",
           "hoodie.datasource.write.operation": "upsert",
           "hoodie.datasource.write.precombine.field": "ts",
       }
       df = spark.createDataFrame(
           [
               Row(id=1, year=2020, month=7, day=5, ts=1),
           ]
       )
       
df.write.format("hudi").options(**hudi_options).mode("append").save(str(tmp_path))
       read_df = spark.read.format("parquet").load(str(tmp_path) + "/*/*/*")
       # This works and prints:
       # [Row(_hoodie_commit_time='20210210160002', 
_hoodie_commit_seqno='20210210160002_0_1', _hoodie_record_key='id:1', 
_hoodie_partition_path='2020/7/5', 
_hoodie_file_name='e8febcc9-58b6-4174-8e83-90842d5492b0-0_0-21-12005_20210210160002.parquet',
 id=1, year=2020, month=7, day=5, ts=1)]
       print(read_df.collect())
   
       read_df = spark.read.format("hudi").load(str(tmp_path) + "/*/*/*")
       # This does not
       print(read_df.collect())
   ```
   2. Install pytest (I'm using 6.1.1) and run: `py.test -s --verbose 
test_hudi.py` (NB: `tmp_path` is a pytest fixture 
https://docs.pytest.org/en/stable/tmpdir.html for creating a temporary 
directory)
   
   **Expected behavior**
   
   The read should work, not sure why `InMemoryFileIndex` is missing. Could be 
something wrong with my setup. I'm using Scala 2.12.10 locally.
   
   **Environment Description**
   
   * Hudi version : 0.7.0
   
   * Spark version : 3.0.0
   
   * Hive version : N/A
   
   * Hadoop version : N/A
   
   * Storage (HDFS/S3/GCS..) : Local
   
   * Running on Docker? (yes/no) : no
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to