soumilshah1995 opened a new issue, #9783:
URL: https://github.com/apache/hudi/issues/9783
Steps
# Step 1 Create Kinesis Streams
```
import boto3
def create_kinesis_stream(stream_name, shard_count):
try:
# Initialize the Kinesis client
kinesis_client = boto3.client('kinesis')
# Create the Kinesis stream
response = kinesis_client.create_stream(
StreamName=stream_name,
ShardCount=shard_count
)
# Check for successful response
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(f"Kinesis stream '{stream_name}' created with
{shard_count} shard(s)")
else:
print("Failed to create Kinesis stream")
except Exception as e:
print(f"Error: {str(e)}")
def delete_kinesis_stream(stream_name):
try:
# Initialize the Kinesis client
kinesis_client = boto3.client('kinesis')
# Delete the Kinesis stream
response = kinesis_client.delete_stream(
StreamName=stream_name
)
# Check for successful response
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(f"Kinesis stream '{stream_name}' deleted successfully")
else:
print("Failed to delete Kinesis stream")
except Exception as e:
print(f"Error: {str(e)}")
create_kinesis_stream("stocks-stream", 1)
```
# Step 2 : Publish some Dummy Data into Input Streams
```
import datetime
import json
import random
import boto3
STREAM_NAME = "stocks-stream"
def get_data():
return {
'event_time': datetime.datetime.now().isoformat(),
'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
'price': round(random.random() * 100, 2)
}
def generate(stream_name, kinesis_client, num_samples):
for _ in range(num_samples):
data = get_data()
print(data)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey="partitionkey")
if __name__ == '__main__':
num_samples = 10 # Change this to the desired number of samples
generate(STREAM_NAME, boto3.client('kinesis'), num_samples)
```
# Stop 3 Download and upload the JAR
```
import boto3
import requests
def download_and_upload_to_s3(url, bucket_name, s3_key):
# Download the JAR file
response = requests.get(url)
if response.status_code == 200:
jar_content = response.content
# Upload to S3
s3_client = boto3.client('s3')
s3_client.put_object(Bucket=bucket_name, Key=s3_key,
Body=jar_content)
print(f"Uploaded {s3_key} to {bucket_name}")
else:
print(f"Failed to download {url}")
if __name__ == "__main__":
# URLs of the JAR files you want to download
jar_urls = [
"https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.15-bundle/0.13.1/hudi-flink1.15-bundle-0.13.1.jar",
"https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.15.0/flink-s3-fs-hadoop-1.15.0.jar",
]
# S3 bucket name and S3 keys (object keys) for uploaded JARs
bucket_name = 'sample-backup-us-west-1'
s3_keys = [
'hudi-flink1.15-bundle-0.13.1.jar',
'flink-s3-fs-hadoop-1.15.0.jar',
]
for i, jar_url in enumerate(jar_urls):
download_and_upload_to_s3(jar_url, bucket_name, s3_keys[i])
```
# Flink code (Enter them in Zeppelin )
```
%flink.ssql(type=update)
DROP TABLE IF EXISTS stocks_stream_table;
CREATE TABLE stocks_stream_table (
uuid VARCHAR,
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'kinesis',
'stream' = 'stocks-stream',
'aws.region' = 'us-west-1',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
%flink.ssql(type=update)
DROP TABLE IF EXISTS stock_table_hudi;
CREATE TABLE IF NOT EXISTS stock_table_hudi (
uuid VARCHAR PRIMARY KEY NOT ENFORCED,
ticker VARCHAR,
price DOUBLE,
event_time TIMESTAMP(3)
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'hudi',
'path' = 's3a://sample-backup-us-west-1/tmp/',
'table.type' = 'MERGE_ON_READ',
'hoodie.embed.timeline.server' = 'false'
);
%ssql
INSERT INTO stock_table_hudi
SELECT uuid, ticker, price, event_time FROM stocks_stream_table;
```


@danny0405
@dannyhchen
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]