schlichtanders commented on issue #6808:
URL: https://github.com/apache/hudi/issues/6808#issuecomment-1264629689

   Thank you for your help,
   have you tried to replicate it on your side?
   
   I added those configs so that the spark sql returns the following
   ```
   
+--------------------------------------------------------------+------------------------------------+
   |key                                                           |value        
                       |
   
+--------------------------------------------------------------+------------------------------------+
   |spark.hadoop.hive.metastore.schema.verification               |false        
                       |
   |spark.hadoop.hive.metastore.schema.verification.record.version|false        
                       |
   |spark.hadoop.javax.jdo.option.ConnectionDriverName            
|org.apache.derby.jdbc.EmbeddedDriver|
   |spark.hadoop.javax.jdo.option.ConnectionURL                   
|*********(redacted)                 |
   
+--------------------------------------------------------------+------------------------------------+
   ```
   
   Now the code runs through, but no tables get registered. As a test, I am 
running `df.write.saveAsTable("saveastable_table")` which indeed works.
   
   I won't have the resources to debug this further. I switched to delta lake 
which works out of the box with sparks local metastore (the one autoenabled by 
merely setting `spark.sql.catalogImplementation=hive`, or using 
`SparkSession.builder.enableHiveSupport()` respectively).
   
   Having the possibility to test hudi locally with a local metastore is really 
crucial for us. It would be great if hudi can support this in the future. An 
example in the documentation which works would be really great. Best case would 
be that the default spark metastore works just out of the box (like it is the 
case for delta lake).
   
   
   --------------
   
   # Here details about my current try
   
   ```python
   from pyspark.sql import SparkSession
   from pathlib import Path
   import os
   
   os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([
       # hudi config
       "--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.0",
       "--conf spark.serializer=org.apache.spark.serializer.KryoSerializer",
       "--conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog",
       "--conf 
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
       # "--conf spark.sql.hive.convertMetastoreParquet=false", # taken from 
AWS example
       # others
       # "--conf 
spark.hadoop.hive.metastore.uris=jdbc:derby:;databaseName=metastore_db;create=true",
       # "--conf spark.hadoop.hive.metastore.uris=''",
       # f"--conf spark.sql.warehouse.dir={Path('.').absolute() / 
'metastore_warehouse'}",
       # "--conf spark.eventLog.enabled=false",
       "--conf spark.sql.catalogImplementation=hive",
       "--conf spark.hadoop.hive.metastore.schema.verification=false",
       "--conf 
spark.hadoop.hive.metastore.schema.verification.record.version=false",
       "--conf 
spark.hadoop.javax.jdo.option.ConnectionDriverName='org.apache.derby.jdbc.EmbeddedDriver'",
       "--conf 
spark.hadoop.javax.jdo.option.ConnectionURL='jdbc:derby:memory:myInMemDB;create=true'",
       "--conf spark.hadoop.datanucleus.schema.autoCreateTables=true",
       # f"--conf spark.sql.warehouse.dir={Path('.').absolute() / 
'metastore_warehouse'}",
       # f"--conf spark.sql.hive.metastore.warehouse.dir={Path('.').absolute() 
/ 'metastore_warehouse'}",
       # necessary last string
       "pyspark-shell",
   ])
   os.environ["PYSPARK_SUBMIT_ARGS"]
   
   spark = SparkSession.builder.getOrCreate()
   spark.sql("set").filter("key rlike 'metastore|jdo'").show(1000,False)
   
   sc = spark.sparkContext
   
   sc.setLogLevel("WARN")
   dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
   inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
       dataGen.generateInserts(10)
   )
   from pyspark.sql.functions import expr
   
   df = spark.read.json(spark.sparkContext.parallelize(inserts, 10)).withColumn(
       "part", expr("'foo'")
   )
   df.toPandas()
   
   tableName = "test_hudi_pyspark_local"
   basePath = f"{Path('.').absolute()}/tmp/{tableName}"
   
   hudi_options = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.partitionpath.field": "part",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "ts",
       # "hoodie.upsert.shuffle.parallelism": 2,
       # "hoodie.insert.shuffle.parallelism": 2,
       "hoodie.datasource.hive_sync.database": "default",
       "hoodie.datasource.hive_sync.table": tableName,
       "hoodie.datasource.hive_sync.enable": "true",
       # "hoodie.datasource.meta.sync.enable": "true",
       # "hoodie.datasource.hive_sync.mode": "hiveql",
       "hoodie.datasource.hive_sync.mode": "hms",
       # "hoodie.datasource.hive_sync.mode": "jdbc",
       # "hoodie.datasource.hive_sync.username": "APP",
       # "hoodie.datasource.hive_sync.use_jdbc": "false",
       # "hoodie.datasource.hive_sync.jdbcurl": 
f"jdbc:derby:;databaseName={Path('.').absolute() / 'metastore_db'};create=true",
       # "hoodie.datasource.hive_sync.jdbcurl": 
"jdbc:derby:;databaseName=metastore_db;create=true",
       "hoodie.datasource.hive_sync.partition_fields": "part",
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
       "index.global.enabled": "true",
       "hoodie.index.type": "GLOBAL_BLOOM",
   }
   
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
   spark.sql("Show tables from default").toPandas()
   
   df.write.saveAsTable("saveastable_table")
   spark.sql("Show tables from default").toPandas()
   ```
   
   The hoodie write outputs
   ```
   22/10/02 14:12:44 WARN HoodieSparkSqlWriter$: hoodie table at 
/home/ssahm/Projects_Freelance/Fielmann/bi_kls_data_ocean_import/src/tmp/tmp/test_hudi_pyspark_local
 already exists. Deleting existing data & overwriting with new data.
   22/10/02 14:12:44 WARN HoodieBackedTableMetadata: Metadata table was not 
found at path 
/home/ssahm/Projects_Freelance/Fielmann/bi_kls_data_ocean_import/src/tmp/tmp/test_hudi_pyspark_local/.hoodie/metadata
   ```
   and if I don't use `hms` but enable 
   ```
   "hoodie.datasource.hive_sync.mode": "jdbc",
   "hoodie.datasource.hive_sync.use_jdbc": "true",
   ```
   I even get an error
   ```
   Output exceeds the [size 
limit](command:workbench.action.openSettings?%5B%22notebook.output.textLineLimit%22%5D).
 Open the full output data[ in a text 
editor](command:workbench.action.openLargeOutput?4c0adbbf-d034-45ec-b730-dcdbff52b5cd)
   An error occurred while calling o217.save.
   : org.apache.hudi.exception.HoodieException: Could not sync using the meta 
sync class org.apache.hudi.hive.HiveSyncTool
        at 
org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:58)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2(HoodieSparkSqlWriter.scala:648)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2$adapted(HoodieSparkSqlWriter.scala:647)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:647)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:734)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:338)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:183)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
   ...
        at java.net.Socket.connect(Socket.java:607)
        at org.apache.thrift.transport.TSocket.open(TSocket.java:221)
        ... 67 more
   ```
   ```
   
---------------------------------------------------------------------------
   Py4JJavaError                             Traceback (most recent 
call last)
   Cell In [16], line 29
         2 basePath = 
f"{Path('.').absolute()}/tmp/{tableName}"
         4 hudi_options = {
         5     
"hoodie.table.name": tableName,
         6     
"hoodie.datasource.write.recordkey.field":
 "uuid",
      (...)
        27     
"hoodie.index.type": 
"GLOBAL_BLOOM",
        28 }
   ---> 29 
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
   
   File 
~/Projects_Freelance/Fielmann/bi_kls_data_ocean_import/src/.venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py:740,
 in DataFrameWriter.save(self, path, format, mode, partitionBy, 
**options)
       738     self._jwrite.save()
       739 else:
   --> 740     
self._jwrite.save(path)
   
   File 
~/Projects_Freelance/Fielmann/bi_kls_data_ocean_import/src/.venv/lib/python3.9/site-packages/py4j/java_gateway.py:1321,
 in JavaMember.__call__(self, *args)
      1315 command = proto.CALL_COMMAND_NAME 
+\
      1316     self.command_header +\
      1317     args_command +\
      1318     proto.END_COMMAND_PART
      1320 answer = 
self.gateway_client.send_command(command)
   -> 1321 return_value = get_return_value(
      1322     answer, 
self.gateway_client, 
self.target_id, self.name)
      1324 for temp_arg in temp_args:
      1325     temp_arg._detach()
   
   File 
~/Projects_Freelance/Fielmann/bi_kls_data_ocean_import/src/.venv/lib/python3.9/site-packages/pyspark/sql/utils.py:111,
 in capture_sql_exception.<locals>.deco(*a, **kw)
       109 def deco(*a, 
**kw):
       110     try:
   --> 111         return f(*a, 
**kw)
       112     except 
py4j.protocol.Py4JJavaError as e:
       113         converted = 
convert_exception(e.java_exception)
   
   File 
~/Projects_Freelance/Fielmann/bi_kls_data_ocean_import/src/.venv/lib/python3.9/site-packages/py4j/protocol.py:326,
 in get_return_value(answer, gateway_client, target_id, name)
       324 value = 
OUTPUT_CONVERTER[type](answer[2:], gateway_client)
       325 if answer[1] == 
REFERENCE_TYPE:
   --> 326     raise Py4JJavaError(
       327         "An error occurred while calling 
{0}{1}{2}.\n".
       328         format(target_id, 
".", name), value)
       329 else:
       330     raise Py4JError(
       331         "An error occurred while calling 
{0}{1}{2}. 
Trace:\n{3}\n".
       332         format(target_id, 
".", name, value))
   
   Py4JJavaError: An error occurred while calling o217.save.
   : org.apache.hudi.exception.HoodieException: Could not sync using the meta 
sync class org.apache.hudi.hive.HiveSyncTool
        at 
org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:58)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2(HoodieSparkSqlWriter.scala:648)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2$adapted(HoodieSparkSqlWriter.scala:647)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:647)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:734)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:338)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:183)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.hudi.exception.HoodieException: Unable to instantiate 
class org.apache.hudi.hive.HiveSyncTool
        at 
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:91)
        at 
org.apache.hudi.sync.common.util.SyncUtilHelpers.instantiateMetaSyncTool(SyncUtilHelpers.java:75)
        at 
org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:56)
        ... 48 more
   Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at 
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89)
        ... 50 more
   Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Got runtime 
exception when hive syncing
        at 
org.apache.hudi.hive.HiveSyncTool.initSyncClient(HiveSyncTool.java:106)
        at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:95)
        ... 55 more
   Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Failed to create 
HiveMetaStoreClient
        at 
org.apache.hudi.hive.HoodieHiveSyncClient.<init>(HoodieHiveSyncClient.java:95)
        at 
org.apache.hudi.hive.HiveSyncTool.initSyncClient(HiveSyncTool.java:101)
        ... 56 more
   Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Cannot create hive 
connection jdbc:hive2://localhost:10000/
        at 
org.apache.hudi.hive.ddl.JDBCExecutor.createHiveConnection(JDBCExecutor.java:107)
        at org.apache.hudi.hive.ddl.JDBCExecutor.<init>(JDBCExecutor.java:59)
        at 
org.apache.hudi.hive.HoodieHiveSyncClient.<init>(HoodieHiveSyncClient.java:85)
        ... 57 more
   Caused by: java.sql.SQLException: Could not open client transport with JDBC 
Uri: jdbc:hive2://localhost:10000: java.net.ConnectException: Connection 
refused (Connection refused)
        at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:224)
        at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:107)
        at java.sql.DriverManager.getConnection(DriverManager.java:664)
        at java.sql.DriverManager.getConnection(DriverManager.java:247)
        at 
org.apache.hudi.hive.ddl.JDBCExecutor.createHiveConnection(JDBCExecutor.java:104)
        ... 59 more
   Caused by: org.apache.thrift.transport.TTransportException: 
java.net.ConnectException: Connection refused (Connection refused)
        at org.apache.thrift.transport.TSocket.open(TSocket.java:226)
        at 
org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:266)
        at 
org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:38)
        at 
org.apache.hive.jdbc.HiveConnection.openTransport(HiveConnection.java:311)
        at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:196)
        ... 63 more
   Caused by: java.net.ConnectException: Connection refused (Connection refused)
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:607)
        at org.apache.thrift.transport.TSocket.open(TSocket.java:221)
        ... 67 more
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to