soumilshah1995 commented on issue #7591: URL: https://github.com/apache/hudi/issues/7591#issuecomment-1374848788
# Detailed Steps i have followed that allows anyone to replicate @nsivabalan @davidshtian @danny0405 # Step 1: Download and Upload the JAR into S3 • https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop/1.13.0 • https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink-bundle_2.12/0.10.1/hudi-flink-bundle_2.12-0.10.1.jar # Step 2: Create KDA Application  ### Provide Name to KDA APP  ### Select the Glue database  #### NOTE i have given Admin Access there should not be problem with access ### Add Custom JAR we downloaded    # Step 3: Launch Zeplin Notebooks  # Step 4: Create a Kinesis Stream called stock-streams  ##### Add some data into streams ``` try: import datetime import json import random import boto3 import os import uuid import time from faker import Faker from dotenv import load_dotenv load_dotenv(".env") except Exception as e: pass global faker faker = Faker() def getReferrer(): data = {} now = datetime.datetime.now() str_now = now.isoformat() data['uuid'] = str(uuid.uuid4()) data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data = json.dumps(getReferrer()) global kinesis_client kinesis_client = boto3.client('kinesis', region_name='us-west-2', aws_access_key_id=os.getenv("DEV_ACCESS_KEY"), aws_secret_access_key=os.getenv("DEV_SECRET_KEY") ) res = kinesis_client.put_record( StreamName="stock-streams", Data=data, PartitionKey="1") print(data, " " , res) time.sleep(2) ```  # Step 5: Lets go try hello world on HUDI ``` %flink.ssql(type=update) DROP TABLE if exists stock_table; CREATE TABLE stock_table ( uuid varchar, ticker VARCHAR, price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = 'stock-streams', 'aws.region' = 'us-west-2', 'scan.stream.initpos' = 'TRIM_HORIZON', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ); ```   ``` %flink.ssql(type=update) DROP TABLE if exists stock_table_hudi; CREATE TABLE stock_table_hudi( uuid varchar , ticker VARCHAR, price DOUBLE, event_time TIMESTAMP(3), PRIMARY KEY (`uuid`) NOT Enforced ) WITH ( 'connector' = 'hudi', 'path' = 's3://soumilshah-hudi-demos/tmp/', 'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE ); ``` #### INSERT Into HUDI ``` %flink.ssql(type=update) INSERT INTO stock_table_hudi SELECT uuid, ticker, price, event_time from stock_table; `` # Error Messages ``` java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi SELECT uuid, ticker, price, event_time from stock_table'. at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538) at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112) at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132) at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi SELECT uuid, ticker, price, event_time from stock_table'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970) at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848) at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532) ... 14 more Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405) at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ... 1 more Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted. at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386) ... 21 more Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/10.100.42.225:8082 at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063) ... 19 more Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/10.100.42.225:8082 Caused by: java.net.ConnectException: Connection refused at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.base/java.lang.Thread.run(Thread.java:829) ``` please let me know if you guys need any other details i have given all steps which help to rectify the problem and solve it happy to hop on call or meeting to show you steps too Email [email protected] -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
