rmnlchh commented on issue #9282:
URL: https://github.com/apache/hudi/issues/9282#issuecomment-1650228180

   > @rmnlchh Just curious, Did you set these configs
   > 
   > ```
   > sc.set("spark.sql.legacy.parquet.nanosAsLong", "false");
   > sc.set("spark.sql.parquet.binaryAsString", "false");
   > sc.set("spark.sql.parquet.int96AsTimestamp", "true");
   > sc.set("spark.sql.caseSensitive", "false");
   > ```
   > 
   > with your deltastreamer also? I will try to reproduce this issue .
   
   Yes, adding all the DS configs
   println(s"hoodieDeltaStreamerConfig=$hoodieDeltaStreamerConfig")
   println(s"typedProperties=$typedProperties")
   println("HERE JSC" + jsc.getConf.getAll.mkString)
   val hoodieDeltaStreamer = new HoodieDeltaStreamer(hoodieDeltaStreamerConfig, 
jsc
        , FSUtils.getFs(hoodieDeltaStreamerConfig.targetBasePath, conf), 
jsc.hadoopConfiguration
        , org.apache.hudi.common.util.Option.of(typedProperties)
   )
   
hoodieDeltaStreamerConfig=Config{targetBasePath='/XXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/./tmp/CreativeDeltaStreamerTest/Domain=CampaignBuild/Table=published_creative/',
 targetTableName='published_creative', tableType='MERGE_ON_READ', 
baseFileFormat='PARQUET', 
propsFilePath='file://XXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/src/test/resources/delta-streamer-config/dfs-source.properties',
 configs=[], 
sourceClassName='org.apache.hudi.utilities.sources.AvroKafkaSource', 
sourceOrderingField='AssetValue', 
payloadClassName='org.apache.hudi.common.model.OverwriteWithLatestAvroPayload', 
schemaProviderClassName='com.cardlytics.datapipeline.deltastreamer.schema.ResourceBasedSchemaProvider',
 
transformerClassNames=[org.apache.hudi.utilities.transform.SqlQueryBasedTransformer],
 sourceLimit=9223372036854775807, operation=UPSERT, filterDupes=false, 
enableHiveSync=false, enableMetaSync=false, forceEmptyMetaSync=false, syn
 cClientToolClassNames=org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool, 
maxPendingCompactions=5, maxPendingClustering=5, continuousMode=false, 
minSyncIntervalSeconds=0, sparkMaster='', commitOnErrors=false, 
deltaSyncSchedulingWeight=1, compactSchedulingWeight=1, 
clusterSchedulingWeight=1, deltaSyncSchedulingMinShare=0, 
compactSchedulingMinShare=0, clusterSchedulingMinShare=0, 
forceDisableCompaction=true, checkpoint='null', 
initialCheckpointProvider='null', help=false}
   typedProperties={spark.sql.avro.compression.codec=snappy, 
hoodie.datasource.hive_sync.table=published_creative, 
hoodie.datasource.hive_sync.partition_fields=Entity, 
hoodie.metadata.index.column.stats.enable=false, hoodie.index.type=BLOOM, 
hoodie.datasource.write.reconcile.schema=true, 
hoodie.deltastreamer.schemaprovider.source.schema.file=domain/campaignbuild/schema/creative.avsc,
 bootstrap.servers=PLAINTEXT://localhost:34873, hoodie.compact.inline=false, 
hoodie.deltastreamer.transformer.sql=
   SELECT
       'Creative' Entity
       ,o.CreativeId
       ,o.PreMessageImpression
       ,o.PostMessageImpression
       ,o.Assets.Type AssetType
       ,o.Assets.Slot AssetSlot
       ,o.Assets.Label AssetLabel
       ,o.Assets.Value AssetValue
   FROM
       (SELECT a.CreativeId, a.PreMessageImpression, a.PostMessageImpression, 
explode(a.Assets) Assets
        FROM
        <SRC> a) o
            , hoodie.parquet.max.file.size=6291456, 
hoodie.datasource.write.recordkey.field=CreativeId,AssetSlot, 
hoodie.index.bloom.num_entries=60000, 
hoodie.datasource.hive_sync.support_timestamp=true, 
hoodie.metadata.enable=false, schema.registry.url=http://localhost:34874, 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator,
 hoodie.datasource.write.table.type=MERGE_ON_READ, 
hoodie.deltastreamer.source.kafka.topic=CMPN-CmpnPub-AdServer-Creative, 
hoodie.datasource.write.hive_style_partitioning=true, 
hoodie.metadata.insert.parallelism=1, 
hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false, 
hoodie.parquet.compression.codec=snappy, spark.io.compression.codec=snappy, 
hoodie.deltastreamer.schemaprovider.target.schema.file=domain/campaignbuild/schema/published_creative_table.json,
 hoodie.bloom.index.prune.by.ranges=true, 
hoodie.datasource.write.partitionpath.field=Entity, 
hoodie.datasource.write.keygenerator.consistent.logical.time
 stamp.enabled=true, hoodie.parquet.block.size=6291456, 
hoodie.cleaner.fileversions.retained=2, hoodie.table.name=published_creative, 
hoodie.upsert.shuffle.parallelism=4, 
hoodie.meta.sync.client.tool.class=org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool,
 spark.sql.parquet.compression.codec=snappy, 
hoodie.datasource.write.precombine.field=AssetValue, 
hoodie.datasource.write.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload,
 
hoodie.datasource.meta.sync.base.path=/XXX/cdp-datapipeline-curation/cdp-datapipeline-curation/datalake-deltastreamer/./tmp/CreativeDeltaStreamerTest/Domain=CampaignBuild/Table=published_creative/,
 hoodie.datasource.hive_sync.database=datalake_ods_local, 
hoodie.datasource.write.operation=upsert, auto.offset.reset=earliest, 
hoodie.metadata.index.column.stats.parallelism=1, 
hoodie.compact.inline.max.delta.commits=5}
   HERE 
JSC(spark.sql.hive.convertMetastoreParquet,false)(spark.driver.extraJavaOptions,-XX:+IgnoreUnrecognizedVMOptions
 --add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
--add-opens=java.base/sun.security.action=ALL-UNNAMED 
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED 
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED)(spark.sql.parquet.datetimeRebaseModeInRead,LEGACY)(spark.sql.legacy.timeParserPolicy,LEGACY)(spark.memory.fraction,0.50)(spark.sql.parquet.outputTimestampType,TIMESTAMP_MICROS)(spark.
 
driver.port,44679)(spark.sql.parquet.int96AsTimestamp,true)(spark.sql.parquet.binaryAsString,false)(spark.memory.storageFraction,0.20)(spark.serializer,org.apache.spark.serializer.KryoSerializer)(spark.sql.avro.datetimeRebaseModeInWrite,LEGACY)(spark.executor.memoryOverheadFactor,0.20)(spark.sql.parquet.compression.codec,snappy)(spark.master,local[*])(spark.sql.caseSensitive,false)(spark.app.name,CreativeDeltaStreamerTest-creative-deltastreamer-1690266341)(spark.app.id,local-1690280742465)(spark.sql.extension,org.apache.spark.sql.hudi.HoodieSparkSessionExtension)(spark.sql.legacy.parquet.nanosAsLong,false)(spark.io.compression.codec,snappy)(spark.driver.host,127.0.0.1)(spark.io.compression.zstd.bufferSize,64k)(spark.shuffle.compress,true)(spark.sql.avro.datetimeRebaseModeInRead,LEGACY)(spark.sql.avro.compression.codec,snappy)(spark.app.startTime,1690280741849)(spark.executor.id,driver)(spark.sql.parquet.int96RebaseModeInRead,LEGACY)(spark.shuffle.spill.compress,true)(spark.rdd.compr
 
ess,true)(spark.sql.adaptive.skewJoin.enabled,true)(spark.broadcast.compress,true)(spark.sql.parquet.datetimeRebaseModeInWrite,LEGACY)(spark.io.compression.zstd.level,1)(spark.driver.memoryOverheadFactor,0.15)(spark.sql.catalog,org.apache.spark.sql.hudi.catalog.HoodieCatalog)(spark.executor.extraJavaOptions,-XX:+IgnoreUnrecognizedVMOptions
 --add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
--add-opens=java.base/sun.security.action=ALL-UNNAMED 
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.securi
 
ty.jgss/sun.security.krb5=ALL-UNNAMED)(spark.executor.cores,1)(spark.io.compression.lz4.blockSize,64k)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to