soumilshah1995 opened a new issue, #9764: URL: https://github.com/apache/hudi/issues/9764
Hello I am trying to get Apache Hudi and Flink working together locally Here are steps # Step 1: Installation Steps ``` conda info --envs # Create ENV conda create -n my-flink-environment pip python=3.8 # Activate ENV conda activate my-flink-environment # Install Flink pip install apache-flink # Install Jupyter Notebook pip install jupyter # Make sure java 11 is installed java -version ## O/P openjdk version "11.0.11" 2021-04-20 OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9) OpenJDK 64-Bit Server VM AdoptOpenJDK-11.0.11+9 (build 11.0.11+9, mixed mode) jupyter notebook ``` # Step 2: Download FLink Jar * https://mvnrepository.com/artifact/org.apache.hudi/hudi-flink1.13-bundle # Step 3: Hello world code ``` from pyflink.table import EnvironmentSettings, TableEnvironment import os from faker import Faker # Create a batch TableEnvironment env_settings = EnvironmentSettings.in_batch_mode() table_env = TableEnvironment.create(env_settings) # Get the current working directory CURRENT_DIR = os.getcwd() # Define a list of JAR file names you want to add jar_files = [ # "hudi-flink-bundle_2.12-0.10.1.jar", # "flink-s3-fs-hadoop-1.16.1.jar", "hudi-flink1.16-bundle-0.13.0.jar", "flink-sql-connector-kinesis-1.16.1.jar" ] # Build the list of JAR URLs by prepending 'file:///' to each file name jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files] table_env.get_config().get_configuration().set_string( "pipeline.jars", ";".join(jar_urls) ) # Initialize Faker fake = Faker() # Generate fake data and convert it into a PyFlink table with column names data = [(fake.name(), fake.city(), fake.state()) for _ in range(10)] # Generate 10 rows of fake data # Define column names column_names = ["name", "city", "state"] # Create a PyFlink table with column names table = table_env.from_elements(data, schema=column_names) table_env.create_temporary_view('source_table', table) table_env.execute_sql(f"SELECT * FROM source_table ").print() """ +--------------------------------+--------------------------------+--------------------------------+ | name | city | state | +--------------------------------+--------------------------------+--------------------------------+ | Alex Hoffman | East Brandonton | New Jersey | | Kimberly Hebert | East Tanya | Connecticut | | Casey Moore | West Erica | Oklahoma | | Joseph Holland | Jaredfort | Rhode Island | | Jennifer Smith | Port Jesse | California | | Christina Valenzuela | Heatherchester | Oregon | | Jonathan Molina | Lake Joy | Oregon | | Allison Lewis | Erintown | Nebraska | | John Frost | Hughesburgh | Montana | | Amy Malone | Aliciaside | Montana | +--------------------------------+--------------------------------+--------------------------------+ 10 rows in set """ # hudi_output_path = 'file://' + os.path.join(os.getcwd(), 'hudi') hudi_output_path = 's3a://datateam-sandbox-qa-demo/tmp/' hudi_sink = f""" CREATE TABLE hudi_table ( name VARCHAR PRIMARY KEY NOT ENFORCED, city VARCHAR, state VARCHAR ) WITH ( 'connector' = 'hudi', 'path' = '{hudi_output_path}' , 'table.type' = 'MERGE_ON_READ' , 'hoodie.embed.timeline.server' = 'false' ); """ table_env.execute_sql(hudi_sink) table_env.execute_sql(hudi_sink) insert_into_hudi_sink_query = "INSERT INTO hudi_table SELECT * FROM source_table" table_env.execute_sql(insert_into_hudi_sink_query) ``` # Error Logs ``` ------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) Cell In[38], line 2 1 insert_into_hudi_sink_query = "INSERT INTO hudi_table SELECT * FROM source_table" ----> 2 table_env.execute_sql(insert_into_hudi_sink_query) File ~/anaconda3/envs/my-new-environment/lib/python3.8/site-packages/pyflink/table/table_environment.py:837, in TableEnvironment.execute_sql(self, stmt) 823 """ 824 Execute the given single statement, and return the execution result. 825 (...) 834 .. versionadded:: 1.11.0 835 """ 836 self._before_execute() --> 837 return TableResult(self._j_tenv.executeSql(stmt)) File ~/anaconda3/envs/my-new-environment/lib/python3.8/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File ~/anaconda3/envs/my-new-environment/lib/python3.8/site-packages/pyflink/util/exceptions.py:146, in capture_java_exception.<locals>.deco(*a, **kw) 144 def deco(*a, **kw): 145 try: --> 146 return f(*a, **kw) 147 except Py4JJavaError as e: 148 from pyflink.java_gateway import get_gateway File ~/anaconda3/envs/my-new-environment/lib/python3.8/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) Py4JJavaError: An error occurred while calling o613.executeSql. : org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.hudi_table'. Table options are: 'connector'='hudi' 'path'='s3a://datateam-sandbox-qa-demo/tmp/' 'table.type'='MERGE_ON_READ' at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:270) at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:459) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:236) at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1803) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765) at jdk.internal.reflect.GeneratedMethodAccessor47.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hudi.configuration.FlinkOptions at org.apache.hudi.table.HoodieTableFactory.createDynamicTableSink(HoodieTableFactory.java:89) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267) ... 28 more ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
