soumilshah1995 opened a new issue, #9783:
URL: https://github.com/apache/hudi/issues/9783

   
   Steps 
   # Step 1 Create Kinesis Streams 
   
   ```
   import boto3
   
   
   def create_kinesis_stream(stream_name, shard_count):
       try:
           # Initialize the Kinesis client
           kinesis_client = boto3.client('kinesis')
   
           # Create the Kinesis stream
           response = kinesis_client.create_stream(
               StreamName=stream_name,
               ShardCount=shard_count
           )
   
           # Check for successful response
           if response['ResponseMetadata']['HTTPStatusCode'] == 200:
               print(f"Kinesis stream '{stream_name}' created with 
{shard_count} shard(s)")
           else:
               print("Failed to create Kinesis stream")
   
       except Exception as e:
           print(f"Error: {str(e)}")
   
   def delete_kinesis_stream(stream_name):
       try:
           # Initialize the Kinesis client
           kinesis_client = boto3.client('kinesis')
   
           # Delete the Kinesis stream
           response = kinesis_client.delete_stream(
               StreamName=stream_name
           )
   
           # Check for successful response
           if response['ResponseMetadata']['HTTPStatusCode'] == 200:
               print(f"Kinesis stream '{stream_name}' deleted successfully")
           else:
               print("Failed to delete Kinesis stream")
   
       except Exception as e:
           print(f"Error: {str(e)}")
   
   create_kinesis_stream("stocks-stream", 1)
   ```
   
   #  Step 2 : Publish some Dummy Data into Input Streams
   ```
   
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "stocks-stream"
   
   
   def get_data():
       return {
           'event_time': datetime.datetime.now().isoformat(),
           'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
           'price': round(random.random() * 100, 2)
       }
   
   
   def generate(stream_name, kinesis_client, num_samples):
       for _ in range(num_samples):
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(data),
               PartitionKey="partitionkey")
   if __name__ == '__main__':
       num_samples = 10  # Change this to the desired number of samples
       generate(STREAM_NAME, boto3.client('kinesis'), num_samples)
   ```
   
   # Stop 3 Download and upload the JAR 
   ```
   
   import boto3
   import requests
   
   def download_and_upload_to_s3(url, bucket_name, s3_key):
       # Download the JAR file
       response = requests.get(url)
       if response.status_code == 200:
           jar_content = response.content
   
           # Upload to S3
           s3_client = boto3.client('s3')
           s3_client.put_object(Bucket=bucket_name, Key=s3_key, 
Body=jar_content)
           print(f"Uploaded {s3_key} to {bucket_name}")
       else:
           print(f"Failed to download {url}")
   
   if __name__ == "__main__":
       # URLs of the JAR files you want to download
       jar_urls = [
           
"https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.15-bundle/0.13.1/hudi-flink1.15-bundle-0.13.1.jar";,
           
"https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.15.0/flink-s3-fs-hadoop-1.15.0.jar";,
       ]
   
       # S3 bucket name and S3 keys (object keys) for uploaded JARs
       bucket_name = 'sample-backup-us-west-1'
       s3_keys = [
           'hudi-flink1.15-bundle-0.13.1.jar',
           'flink-s3-fs-hadoop-1.15.0.jar',
       ]
   
       for i, jar_url in enumerate(jar_urls):
           download_and_upload_to_s3(jar_url, bucket_name, s3_keys[i])
   
   ```
   
   # Flink code (Enter them in Zeppelin )
   ```
   %flink.ssql(type=update)
   
   DROP TABLE IF EXISTS stocks_stream_table;
   
   CREATE TABLE stocks_stream_table (
       uuid VARCHAR,
       ticker VARCHAR(6),
       price DOUBLE,
       event_time TIMESTAMP(3),
       WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
   )
   PARTITIONED BY (ticker)
   WITH (
       'connector' = 'kinesis',
       'stream' = 'stocks-stream',
       'aws.region' = 'us-west-1',
       'scan.stream.initpos' = 'TRIM_HORIZON',
       'format' = 'json',
       'json.timestamp-format.standard' = 'ISO-8601'
   )
   
   
   %flink.ssql(type=update)
   
   DROP TABLE IF EXISTS stock_table_hudi;
   
   CREATE TABLE IF NOT EXISTS stock_table_hudi (
       uuid VARCHAR PRIMARY KEY NOT ENFORCED,
       ticker VARCHAR,
       price DOUBLE,
       event_time TIMESTAMP(3)
   )
   PARTITIONED BY (ticker)
   WITH (
       'connector' = 'hudi',
       'path' = 's3a://sample-backup-us-west-1/tmp/',
       'table.type' = 'MERGE_ON_READ',
       'hoodie.embed.timeline.server' = 'false'
   );
   
   
   %ssql
   
   INSERT INTO stock_table_hudi 
   SELECT uuid, ticker, price, event_time FROM stocks_stream_table;
   
   
   ```
   ![Screenshot 2023-09-25 at 6 17 30 
PM](https://github.com/apache/hudi/assets/39345855/d3988782-5776-4009-8f8e-1b533687535f)
   
   ![Screenshot 2023-09-25 at 6 18 27 
PM](https://github.com/apache/hudi/assets/39345855/195bba39-c369-4cf9-a27f-f28e3e7f7c3d)
   
   
   @danny0405 
   @dannyhchen 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to