soumilshah1995 opened a new issue, #9764:
URL: https://github.com/apache/hudi/issues/9764

   Hello I am trying to get Apache Hudi and Flink working together locally 
   Here are steps 
   
   
   
   # Step 1:  Installation Steps 
   ```
   
   conda info --envs
   
   # Create ENV
   conda create -n my-flink-environment pip python=3.8
   
   # Activate ENV
   conda activate my-flink-environment
   
   # Install Flink
   pip install apache-flink
   
   # Install Jupyter Notebook
   pip install jupyter
   
   # Make sure java 11 is installed 
   java -version
   
   ## O/P
   openjdk version "11.0.11" 2021-04-20
   OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9)
   OpenJDK 64-Bit Server VM AdoptOpenJDK-11.0.11+9 (build 11.0.11+9, mixed mode)
   
   
   jupyter notebook
   ```
   
   # Step 2: Download FLink Jar 
   * https://mvnrepository.com/artifact/org.apache.hudi/hudi-flink1.13-bundle
   
   # Step 3:  Hello world code 
   ```
   from pyflink.table import EnvironmentSettings, TableEnvironment
   import os
   from faker import Faker
   
   # Create a batch TableEnvironment
   env_settings = EnvironmentSettings.in_batch_mode()
   table_env = TableEnvironment.create(env_settings)
   
   # Get the current working directory
   CURRENT_DIR = os.getcwd()
   
   # Define a list of JAR file names you want to add
   jar_files = [
       # "hudi-flink-bundle_2.12-0.10.1.jar",
       # "flink-s3-fs-hadoop-1.16.1.jar",
       "hudi-flink1.16-bundle-0.13.0.jar",
       "flink-sql-connector-kinesis-1.16.1.jar"
   ]
   
   # Build the list of JAR URLs by prepending 'file:///' to each file name
   jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
   
   table_env.get_config().get_configuration().set_string(
       "pipeline.jars",
       ";".join(jar_urls)
   )
   
   # Initialize Faker
   fake = Faker()
   
   # Generate fake data and convert it into a PyFlink table with column names
   data = [(fake.name(), fake.city(), fake.state()) for _ in range(10)]  # 
Generate 10 rows of fake data
   
   # Define column names
   column_names = ["name", "city", "state"]
   
   # Create a PyFlink table with column names
   table = table_env.from_elements(data, schema=column_names)
   table_env.create_temporary_view('source_table', table)
   
   table_env.execute_sql(f"SELECT * FROM source_table ").print()
   
   """
   
+--------------------------------+--------------------------------+--------------------------------+
   |                           name |                           city |          
                state |
   
+--------------------------------+--------------------------------+--------------------------------+
   |                   Alex Hoffman |                East Brandonton |          
           New Jersey |
   |                Kimberly Hebert |                     East Tanya |          
          Connecticut |
   |                    Casey Moore |                     West Erica |          
             Oklahoma |
   |                 Joseph Holland |                      Jaredfort |          
         Rhode Island |
   |                 Jennifer Smith |                     Port Jesse |          
           California |
   |           Christina Valenzuela |                 Heatherchester |          
               Oregon |
   |                Jonathan Molina |                       Lake Joy |          
               Oregon |
   |                  Allison Lewis |                       Erintown |          
             Nebraska |
   |                     John Frost |                    Hughesburgh |          
              Montana |
   |                     Amy Malone |                     Aliciaside |          
              Montana |
   
+--------------------------------+--------------------------------+--------------------------------+
   10 rows in set
   
   
   """
   
   # hudi_output_path = 'file://' + os.path.join(os.getcwd(), 'hudi')
   hudi_output_path = 's3a://datateam-sandbox-qa-demo/tmp/'
   
   hudi_sink = f"""
   CREATE TABLE hudi_table (
       name VARCHAR PRIMARY KEY NOT ENFORCED,
       city VARCHAR,
       state VARCHAR
   )
   WITH (
       'connector' = 'hudi',
       'path' = '{hudi_output_path}' ,
       'table.type' = 'MERGE_ON_READ' ,
        'hoodie.embed.timeline.server' = 'false'
       
   );
   """
   table_env.execute_sql(hudi_sink)
   
   table_env.execute_sql(hudi_sink)
   insert_into_hudi_sink_query = "INSERT INTO hudi_table SELECT * FROM 
source_table"
   table_env.execute_sql(insert_into_hudi_sink_query)
   
   ```
   
   
   # Error Logs
   ```
   
   
   -------------------------------------------------------------------------
   Py4JJavaError                             Traceback (most recent call last)
   Cell In[38], line 2
         1 insert_into_hudi_sink_query = "INSERT INTO hudi_table SELECT * FROM 
source_table"
   ----> 2 table_env.execute_sql(insert_into_hudi_sink_query)
   
   File 
~/anaconda3/envs/my-new-environment/lib/python3.8/site-packages/pyflink/table/table_environment.py:837,
 in TableEnvironment.execute_sql(self, stmt)
       823 """
       824 Execute the given single statement, and return the execution result.
       825 
      (...)
       834 .. versionadded:: 1.11.0
       835 """
       836 self._before_execute()
   --> 837 return TableResult(self._j_tenv.executeSql(stmt))
   
   File 
~/anaconda3/envs/my-new-environment/lib/python3.8/site-packages/py4j/java_gateway.py:1322,
 in JavaMember.__call__(self, *args)
      1316 command = proto.CALL_COMMAND_NAME +\
      1317     self.command_header +\
      1318     args_command +\
      1319     proto.END_COMMAND_PART
      1321 answer = self.gateway_client.send_command(command)
   -> 1322 return_value = get_return_value(
      1323     answer, self.gateway_client, self.target_id, self.name)
      1325 for temp_arg in temp_args:
      1326     if hasattr(temp_arg, "_detach"):
   
   File 
~/anaconda3/envs/my-new-environment/lib/python3.8/site-packages/pyflink/util/exceptions.py:146,
 in capture_java_exception.<locals>.deco(*a, **kw)
       144 def deco(*a, **kw):
       145     try:
   --> 146         return f(*a, **kw)
       147     except Py4JJavaError as e:
       148         from pyflink.java_gateway import get_gateway
   
   File 
~/anaconda3/envs/my-new-environment/lib/python3.8/site-packages/py4j/protocol.py:326,
 in get_return_value(answer, gateway_client, target_id, name)
       324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
       325 if answer[1] == REFERENCE_TYPE:
   --> 326     raise Py4JJavaError(
       327         "An error occurred while calling {0}{1}{2}.\n".
       328         format(target_id, ".", name), value)
       329 else:
       330     raise Py4JError(
       331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
       332         format(target_id, ".", name, value))
   
   Py4JJavaError: An error occurred while calling o613.executeSql.
   : org.apache.flink.table.api.ValidationException: Unable to create a sink 
for writing table 'default_catalog.default_database.hudi_table'.
   
   Table options are:
   
   'connector'='hudi'
   'path'='s3a://datateam-sandbox-qa-demo/tmp/'
   'table.type'='MERGE_ON_READ'
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:270)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:459)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:236)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1803)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
        at jdk.internal.reflect.GeneratedMethodAccessor47.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.hudi.configuration.FlinkOptions
        at 
org.apache.hudi.table.HoodieTableFactory.createDynamicTableSink(HoodieTableFactory.java:89)
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267)
        ... 28 more
   
   
   
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to