meatheadmike opened a new issue, #12108:
URL: https://github.com/apache/hudi/issues/12108

   **Describe the problem you faced**
   
   A clear and concise description of the problem.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Spin up a spark cluster on kubernetes with 1 x driver @ 16gb/1 core and 5 
x executors @ 16gb/1 core
   2. Submit the pyspark script show below
   3. Spin up a separate spark query node while the ingest is happening (1 x 
16GB / 1 core)
   4. Run a simple query against the hudi table in a loop. i.e.: 
`spark.sql('refresh table example-table').show(); spark.sql('select count(*) 
from example-table').show()`
   5. Wait for 4 or 5 ingest batches to complete. After that, the stack trace 
will show up instead of the record count as expected. Note that all queries 
fail from this point on. 
   
   **Pyspark script***
   ```python
   from pyspark.sql import SparkSession
   from pyspark.sql.functions import *
   from pyspark.sql.types import *
   import dbldatagen as dg
   import json
   import uuid
   import random
   
   spark = (
       SparkSession
       .builder
       .appName('stream-to-hudi-s3-example')
       .getOrCreate()
   )
   
   hudi_db = 'default'
   hudi_table = 'example-table'
   hudi_checkpoint_path = 's3a://my-checkpoint-path'
   hudi_table_path = 's3a://my-table-path' 
   hive_thrift_url = 'thrift://hive-metastore.default:9083'
   hive_jdbc_url = 'jdbc:hive2://hive-metastore.default:10000'
   
   schema = ArrayType(
       StructType([
           StructField("domain", StringType(), False),
           StructField("risk", StringType(), False),
           StructField("timestamp", TimestampType(), False),
       ])
   )
   
   multi_writer_id = 'datagen-writer1'
   @udf(returnType=StringType())
   def generate_domain(): 
       rand_domain = f"{random.randrange(10000000,20000000)}.com"
       return rand_domain
   
   @udf(returnType=LongType())
   def generate_timestamp():
       return random.randrange(1000000000,2000000000)
   
   @udf(returnType=StringType())
   def generate_risk():
       return json.dumps({"blaa":str(uuid.uuid4())})
    
   ds = (
            dg.DataGenerator(spark, name="test-data-set", partitions=1)
            .withColumn("offset", "long", minValue=1, maxValue=9999999, 
random=True)
            .withColumn("timestamp_", "timestamp", random=True)
            .build(withStreaming=True, options={'rowsPerSecond': 10000, 
'rampUpTimeSeconds':60})
            .withColumn("domain", generate_domain())
            .withColumn("timestamp", generate_timestamp())
            .withColumn("risk", generate_risk())
            .withColumnRenamed("timestamp_","kafka_timestamp")
       ) 
   
   df = (
            ds.select(col("offset"), col("kafka_timestamp"), col("domain"), 
col("timestamp"), col("risk"))
            .na.drop()
        )
   
   
   hudi_precombine_field = 'timestamp'
   hudi_recordkey_field = 'domain'
   
   hudi_options = {
       'hoodie.archive.async': True,
       'hoodie.clean.async.enabled': True,
       'hoodie.clean.automatic': True,
       'hoodie.clean.commits.retained': 5,
       'hoodie.clean.policy': 'KEEP_LATEST_COMMITS',
       'hoodie.clean.fileversions.retained': '2',
       'hoodie.cleaner.policy.failed.writes': 'LAZY',
       'hoodie.clustering.async.enabled': False,
       'hoodie.clustering.async.max.commits': 0,
       'hoodie.clustering.async.max.commits': 2,
       'hoodie.clustering.execution.strategy.class': 
'org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy',
       'hoodie.clustering.inline': True,
       'hoodie.clustering.inline.max.commits': 2,
       'hoodie.clustering.plan.strategy.class': 
'org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy',
       'hoodie.clustering.plan.strategy.sort.columns': hudi_recordkey_field,
       'hoodie.clustering.preserve.commit.metadata': True,
       'hoodie.clustering.rollback.pending.replacecommit.on.conflict': True,
       'hoodie.clustering.updates.strategy': 
'org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy',
       'hoodie.compact.inline.max.delta.commits': 2,
       'hoodie.datasource.hive_sync.db': hudi_db,
       'hoodie.datasource.hive_sync.enable':True,
       'hoodie.datasource.hive_sync.ignore_exceptions': True,
       'hoodie.datasource.hive_sync.jdbcurl':hive_jdbc_url,
       'hoodie.datasource.hive_sync.metastore.uris':hive_thrift_url,
       'hoodie.datasource.hive_sync.mode':'hms',
       
'hoodie.datasource.hive_sync.partition_extractor_class':'org.apache.hudi.hive.NonPartitionedExtractor',
       'hoodie.datasource.hive_sync.password':'',
       'hoodie.datasource.hive_sync.recreate_table_on_error': True,
       'hoodie.datasource.hive_sync.skip_ro_suffix':True,
       'hoodie.datasource.hive_sync.table':hudi_table,
       'hoodie.datasource.hive_sync.username':'hive',
       'hoodie.datasource.meta.sync.enable': 'true',
       'hoodie.datasource.read.incr.fallback.fulltablescan.enable': True,
       'hoodie.datasource.read.use.new.parquet.file.format': True,
       'hoodie.datasource.write.hive_style_partitioning': 'true',
       'hoodie.datasource.write.operation': 'upsert',
       'hoodie.datasource.write.precombine.field': hudi_precombine_field,
       'hoodie.datasource.write.reconcile.schema':'true',
       'hoodie.datasource.write.record.merger.impls': 
'org.apache.hudi.HoodieSparkRecordMerger',
       'hoodie.datasource.write.recordkey.field': hudi_recordkey_field,
       'hoodie.datasource.write.row.writer.enable': True,
       'hoodie.datasource.write.streaming.checkpoint.identifier': 
multi_writer_id,
       'hoodie.datasource.write.streaming.ignore.failed.batch': 'true',
       'hoodie.datasource.write.table.name': hudi_table,
       'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
       'hoodie.enable.data.skipping': True,
       'hoodie.index.type': 'RECORD_INDEX',
       'hoodie.keep.min.commits':'10',
       'hoodie.logfile.data.block.format':'parquet',
       'hoodie.merge.use.record.positions': True,
       'hoodie.metadata.auto.initialize': True,
       'hoodie.metadata.enable': True,
       'hoodie.metadata.clean.async': True,
       'hoodie.metadata.index.async': False, # DO NOT SET TRUE!!! Record and 
column indexes will not be created!
       'hoodie.metadata.index.column.stats.columns': hudi_recordkey_field,
       'hoodie.metadata.index.column.stats.column.list': hudi_recordkey_field,
       'hoodie.metadata.index.column.stats.enable': True,
       'hoodie.metadata.record.index.enable': True,
       'hoodie.parquet.avro.write-old-list-structure':'false',
       'hoodie.parquet.compression.codec': 'snappy',
       'hoodie.record.index.use.caching':True,
       'hoodie.schema.on.read.enable': True,
       'hoodie.table.name': hudi_table,
       'hoodie.table.services.enabled': True,
       'hoodie.write.concurrency.early.conflict.detection.enable': True,
       'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
       'hoodie.write.executor.type': 'DISRUPTOR',
       'hoodie.write.lock.provider': 
'org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider',
       'hoodie.write.lock.wait_time_ms': '300000',
       'hoodie.write.lock.zookeeper.base_path': '/hudi/local/table',
       'hoodie.write.lock.zookeeper.lock_key': f'{hudi_table}',
       'hoodie.write.lock.zookeeper.port': '2181',
       'hoodie.write.lock.zookeeper.url': 'zk-cs.default',
       'hoodie.write.set.null.for.missing.columns': True, 
       'checkpointLocation': hudi_checkpoint_path,
       'parquet.avro.write-old-list-structure': 'false',
       'path': hudi_table_path,
   }
   
   print(f"hudi_options={hudi_options}")
   
   df.writeStream \
      .format("org.apache.hudi") \
      .options(**hudi_options) \
      .outputMode("append") \
      .start() 
   
   spark.streams.awaitAnyTermination()
   ``` 
   
   **Expected behavior**
   
   Queries should succeed without crashing.
   
   **Environment Description**
   
   * Hudi version : 1.0.0-beta2
   
   * Spark version : 3.5.3
   
   * Hive version : 3.1.3
   
   * Hadoop version : 3.3.4
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : yes
   
   
   **Additional context**
   
   I've been trying to narrow down the cause of this without a huge amount of 
success. I disabled clustering, but it still occurs. It looks like something 
strange is happening on the read when it's merging. It doesn't seem to crash if 
every record key is 100% unique. But of course in the real world I need to be 
able to upsert data...
   
   **Stacktrace**
   
   ```
   24/10/15 20:53:21 INFO TaskSetManager: Finished task 2.0 in stage 77.0 (TID 
307) in 6029 ms on mike-test-spark-app-74d7488c9d-kzm2z (executor driver) 
(14/15)
   24/10/15 20:53:22 INFO HoodieLogFileReader: Closing Log file reader 
.cbd4e0ac-3d14-4a38-9199-6800a4e25946-0_20241015205126562.log.1_6-361-2513
   24/10/15 20:53:22 INFO HoodieMergedLogRecordReader: Number of log files 
scanned => 1
   24/10/15 20:53:22 INFO HoodieMergedLogRecordReader: Number of entries in Map 
=> 233427
   24/10/15 20:53:22 INFO InternalParquetRecordReader: at row 0. reading next 
block
   24/10/15 20:53:23 INFO InternalParquetRecordReader: block read in memory in 
1023 ms. row count = 1686704
   24/10/15 20:53:23 ERROR Executor: Exception in task 0.0 in stage 77.0 (TID 
305)
   com.esotericsoftware.kryo.KryoException: 
java.lang.UnsupportedOperationException
   Serialization trace:
   reserved (org.apache.avro.Schema$Field)
   fieldMap (org.apache.avro.Schema$RecordSchema)
   right (org.apache.hudi.common.util.collection.ImmutablePair)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
        at 
org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:103)
        at 
org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:77)
        at 
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:209)
        at 
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:202)
        at 
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:198)
        at 
org.apache.hudi.common.util.collection.BitCaskDiskMap.remove(BitCaskDiskMap.java:245)
        at 
org.apache.hudi.common.util.collection.BitCaskDiskMap.remove(BitCaskDiskMap.java:67)
        at 
org.apache.hudi.common.util.collection.ExternalSpillableMap.remove(ExternalSpillableMap.java:240)
        at 
org.apache.hudi.common.table.read.HoodieKeyBasedFileGroupRecordBuffer.hasNextBaseRecord(HoodieKeyBasedFileGroupRecordBuffer.java:125)
        at 
org.apache.hudi.common.table.read.HoodieKeyBasedFileGroupRecordBuffer.doHasNext(HoodieKeyBasedFileGroupRecordBuffer.java:135)
        at 
org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer.hasNext(HoodieBaseFileGroupRecordBuffer.java:130)
        at 
org.apache.hudi.common.table.read.HoodieFileGroupReader.hasNext(HoodieFileGroupReader.java:201)
        at 
org.apache.hudi.common.table.read.HoodieFileGroupReader$HoodieFileGroupReaderIterator.hasNext(HoodieFileGroupReader.java:262)
        at 
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat$$anon$1.hasNext(HoodieFileGroupReaderBasedParquetFileFormat.scala:250)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: java.lang.UnsupportedOperationException
        at 
java.base/java.util.Collections$UnmodifiableCollection.add(Collections.java:1067)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        ... 49 more
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to