soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1374848788

   # Detailed Steps i have followed that allows anyone to replicate 
   @nsivabalan  @davidshtian @danny0405  
   
   
   # Step 1:  Download and Upload the JAR into S3 
   •    
https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop/1.13.0https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink-bundle_2.12/0.10.1/hudi-flink-bundle_2.12-0.10.1.jar
   
   # Step 2:  Create KDA Application 
   
   
![image](https://user-images.githubusercontent.com/39345855/211200378-71b50563-4ca8-4c57-afbc-e30ee86ad9d1.png)
   
   ### Provide Name to KDA APP
   
![image](https://user-images.githubusercontent.com/39345855/211200437-1af10bac-b2dd-40e6-9686-7c41c2a092c7.png)
   
   ### Select the Glue database 
   
![image](https://user-images.githubusercontent.com/39345855/211200482-49b07851-35fd-4260-9426-691b324becad.png)
   
   #### NOTE i have given Admin Access there should not be problem with access 
   
   ### Add Custom JAR we downloaded 
   
![image](https://user-images.githubusercontent.com/39345855/211200540-58facbee-50ec-4ca3-abb8-3f6c5eec7fbd.png)
   
   
![image](https://user-images.githubusercontent.com/39345855/211200599-7a362e7c-b4f8-422d-a18d-62566fd0ed58.png)
   
![image](https://user-images.githubusercontent.com/39345855/211200607-f2ec629b-3adf-437b-bc6e-9807c696711f.png)
   
   # Step 3:  Launch Zeplin Notebooks
   
![image](https://user-images.githubusercontent.com/39345855/211200880-6eb1d210-adcb-4a58-98a2-d3e4fabcec4e.png)
   
   # Step 4:  Create a Kinesis Stream called stock-streams
   
   
![image](https://user-images.githubusercontent.com/39345855/211201204-f38b196e-6080-4831-aebb-72feb529ac38.png)
   
   ##### Add some data into streams 
   
   ```
   try:
       import datetime
       import json
       import random
       import boto3
       import os
       import uuid
       import time
       from faker import Faker
   
       from dotenv import load_dotenv
       load_dotenv(".env")
   except Exception as e:
       pass
   
   global faker
   faker = Faker()
   
   
   def getReferrer():
       data = {}
       now = datetime.datetime.now()
       str_now = now.isoformat()
       data['uuid'] = str(uuid.uuid4())
       data['event_time'] = str_now
   
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data = json.dumps(getReferrer())
   
       global kinesis_client
   
       kinesis_client = boto3.client('kinesis',
                                     region_name='us-west-2',
                                     
aws_access_key_id=os.getenv("DEV_ACCESS_KEY"),
                                     
aws_secret_access_key=os.getenv("DEV_SECRET_KEY")
                                     )
   
       res = kinesis_client.put_record(
           StreamName="stock-streams",
           Data=data,
           PartitionKey="1")
       print(data, " " , res)
       time.sleep(2)
   ```
   
![image](https://user-images.githubusercontent.com/39345855/211201231-b92861c9-4fc0-480a-9312-f966d4c0d81f.png)
   
   
   # Step 5: Lets go try hello world on HUDI 
   ```
   %flink.ssql(type=update)
   DROP TABLE if exists stock_table;
   CREATE TABLE stock_table (
       uuid varchar,
       ticker VARCHAR,
       price DOUBLE,
       event_time TIMESTAMP(3),
       WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
   )
       PARTITIONED BY (ticker)
   WITH (
       'connector' = 'kinesis',
       'stream' = 'stock-streams',
       'aws.region' = 'us-west-2',
       'scan.stream.initpos' = 'TRIM_HORIZON',
       'format' = 'json',
       'json.timestamp-format.standard' = 'ISO-8601'
   );
   ```
   
![image](https://user-images.githubusercontent.com/39345855/211201321-3c2366dc-ddf0-4487-a925-38922a942f43.png)
   
![image](https://user-images.githubusercontent.com/39345855/211201361-446728c9-833e-4708-9b9d-c39ea650392d.png)
   ```
   %flink.ssql(type=update)
   DROP TABLE if exists stock_table_hudi;
   CREATE TABLE stock_table_hudi(
       uuid varchar  ,
       ticker VARCHAR,
       price DOUBLE,
       event_time TIMESTAMP(3),
       PRIMARY KEY (`uuid`) NOT Enforced
   
   )
   WITH (
       'connector' = 'hudi',
       'path' = 's3://soumilshah-hudi-demos/tmp/',
       'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by 
default is COPY_ON_WRITE
   );
   ```
   #### INSERT Into HUDI 
   ```
   %flink.ssql(type=update)
   INSERT INTO stock_table_hudi
   SELECT  uuid, ticker, price, event_time  from stock_table;
   ``
   
   
   # Error Messages 
   ```
   java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute 
job 'INSERT INTO stock_table_hudi
   SELECT  uuid, ticker, price, event_time  from stock_table'.
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
        at 
org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
        at 
org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
        at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
        at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
        at 
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
        at 
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
'INSERT INTO stock_table_hudi
   SELECT  uuid, ticker, price, event_time  from stock_table'.
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
        at 
org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
        at 
org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
        at 
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
        ... 14 more
   Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
        at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)
        at 
java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
        at 
java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
        at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
        at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        ... 1 more
   Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Number of retries has been exhausted.
        at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
        ... 21 more
   Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: zeppelin-flink/10.100.42.225:8082
        at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
        at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
        at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
        ... 19 more
   Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: zeppelin-flink/10.100.42.225:8082
   Caused by: java.net.ConnectException: Connection refused
        at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at 
java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   
   
   please let me know if you guys need any other details i have given all steps 
which help to rectify the problem and solve it happy to hop on call or meeting 
to show you steps too Email [email protected]
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to