petersg opened a new issue, #4633:
URL: https://github.com/apache/iceberg/issues/4633

   Hello every one,
   
   I am trying to develop this example: 
https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples/blob/master/python/S3Sink/streaming-file-sink.py
   
   But trying to store it in iceberg format. I've try to follow this 
documentation:
   https://iceberg.apache.org/docs/latest/aws/
   Exactly the Flink point. But i dont know how to execute so many jars into 
KDA(Kinesis Data Analytics) because you can declare only 1 jar into the 
runtime, and in this case is the flink-sql-kinesis (to be able to execute 
flinkSQL). But according to the documentation i need also the 
flink-iceberg-runtime, aws sdk bundle and url-connection-client. 
   
   In the example provided by iceberg documentation, it looks like they are 
doing it locally, but I need to do it from KDA.
   
   I've try this:
   
   `# -*- coding: utf-8 -*-
   
   """
   streaming-file-sink.py
   ~~~~~~~~~~~~~~~~~~~
   This module:
       1. Creates a table environment
       2. Creates a source table from a Kinesis Data Stream
       3. Creates a sink table writing to an S3 Bucket
       4. Queries from the Source Table and
          creates a tumbling window over 1 minute to calculate the average 
PRICE over the window.
       5. These tumbling window results are inserted into the Sink table (S3)
   """
   
   from pyflink.table import EnvironmentSettings, StreamTableEnvironment
   from pyflink.table.window import Tumble
   import os
   import json
   
   # 1. Creates a Table Environment
   env_settings = (
       
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
   )
   table_env = StreamTableEnvironment.create(environment_settings=env_settings)
   
   APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"  
# on kda
   
   #Here im trying to load some how this jars, in order to be used.
   table_env.add_jars("lib/iceberg-flink-runtime-0.12.1.jar")
   table_env.add_jars("lib/bundle-2.17.173.jar")
   table_env.add_jars("lib/url-connection-client-2.17.176.jar")
   
   #is_local = (
   #    True if os.environ.get("IS_LOCAL") else False
   #)  # set this env var in your local environment
   is_local = (
       True if os.environ.get("IS_LOCAL") else False
   )  # set this env var in your local environment
   
   if is_local:
       # only for local, overwrite variable to properties and pass in your jars 
delimited by a semicolon (;)
       APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"  # local
   
       CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
       table_env.get_config().get_configuration().set_string(
           "pipeline.jars",
           "file:///"
           + CURRENT_DIR
           + "/lib/flink-sql-connector-kinesis_2.12-1.13.2.jar;file:///"
           + CURRENT_DIR
           + "/plugins/flink-s3-fs-hadoop-1.9.3.jar",
       )
   
       table_env.get_config().get_configuration().set_string(
           "execution.checkpointing.mode", "EXACTLY_ONCE"
       )
       table_env.get_config().get_configuration().set_string(
           "execution.checkpointing.interval", "1min"
       )
   
   
   def get_application_properties():
       if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
           with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
               contents = file.read()
               properties = json.loads(contents)
               return properties
       else:
           print('A file at "{}" was not 
found'.format(APPLICATION_PROPERTIES_FILE_PATH))
   
   
   def property_map(props, property_group_id):
       for prop in props:
           if prop["PropertyGroupId"] == property_group_id:
               return prop["PropertyMap"]
   
   
   def create_glue_catalog():
       return """CREATE CATALOG flink_iceberg_test WITH (
               'type'='iceberg',
               'warehouse'='s3://path/FlinkS3Ice-test/',
               'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
               'io-impl'='org.apache.iceberg.aws.s3.S3FileIO')"""
   
   def create_source_table(table_name, stream_name, region, stream_initpos):
       return """ CREATE TABLE {0} (
                   TICKER VARCHAR(6),
                   PRICE DOUBLE,
                   EVENT_TIME TIMESTAMP(3),
                   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '5' SECOND
                 )
                 PARTITIONED BY (TICKER)
                 WITH (
                   'connector' = 'kinesis',
                   'stream' = '{1}',
                   'aws.region' = '{2}',
                   'scan.stream.initpos' = '{3}',
                   'format' = 'json',
                   'json.timestamp-format.standard' = 'ISO-8601'
                 ) """.format(
           table_name, stream_name, region, stream_initpos
       )
   
   # Create the iceberg table
   def create_sink_table_2(table_name,bucket_name):
       # Catalog Name + DatabaseName + TableName
       return """ CREATE TABLE flink_iceberg_test.db_flink.{0} (
                   TICKER VARCHAR(6),
                   PRICE DOUBLE,
                   EVENT_TIME TIMESTAMP(3),
                   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '5' SECOND
               )
               PARTITIONED BY (TICKER)
               WITH ('type'='iceberg',
                   'write.format.default'='parquet',
                   'write.object-storage.enabled'=true, 
                   'write.data.path'='s3://{1}/FlinkS3Ice-test/data/')
               """.format(
           table_name, bucket_name)
   
   
   
   
   def perform_tumbling_window_aggregation(input_table_name):
       # use SQL Table in the Table API
       input_table = table_env.from_path(input_table_name)
   
       tumbling_window_table = (
           input_table.window(
               
Tumble.over("1.minute").on("EVENT_TIME").alias("one_minute_window")
           )
           .group_by("TICKER, one_minute_window")
           .select("TICKER, PRICE.avg as PRICE, one_minute_window.end as 
EVENT_TIME")
       )
   
       return tumbling_window_table
   
   
   def main():
       # Application Property Keys
       input_property_group_key = "consumer.config.0"
       sink_property_group_key = "sink.config.0"
   
       input_stream_key = "input.stream.name"
       input_region_key = "aws.region"
       input_starting_position_key = "flink.stream.initpos"
   
       output_sink_key = "output.bucket.name"
   
       # tables
       input_table_name = "input_table"
       output_table_name = "output_table"
   
       # get application properties
       props = get_application_properties()
   
       input_property_map = property_map(props, input_property_group_key)
       output_property_map = property_map(props, sink_property_group_key)
   
       input_stream = input_property_map[input_stream_key]
       input_region = input_property_map[input_region_key]
       stream_initpos = input_property_map[input_starting_position_key]
   
       output_bucket_name = output_property_map[output_sink_key]
   
       # 0. Create glue catalog
       table_env.exceute_sql(
           create_glue_catalog()
       )
   
       # 2. Creates a source table from a Kinesis Data Stream
       table_env.execute_sql(
           create_source_table(
               input_table_name, input_stream, input_region, stream_initpos
           )
       )
   
       # 3. Creates a sink table writing to an S3 Bucket
       create_sink = create_sink_table_2(
           output_table_name, output_bucket_name
       )
       table_env.execute_sql(create_sink)
   
       # 4. Queries from the Source Table and creates a tumbling window over 1 
minute to calculate the average PRICE
       # over the window.
       tumbling_window_table = 
perform_tumbling_window_aggregation(input_table_name)
       table_env.create_temporary_view("tumbling_window_table", 
tumbling_window_table)
   
       # 5. These tumbling windows are inserted into the sink table (S3)
       table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                                             .format(output_table_name, 
"tumbling_window_table"))
   
       print(table_result.get_job_client().get_job_status())
   
   
   if __name__ == "__main__":
       main()
   `
   
   I've recreate the example to store the data in a S3sink, and works fine. But 
this approach needs glue catalog for the iceberg, and it seems that is not 
creating that catalog. This code is supposed to create the resources such as 
the table and the catalogue in iceberg format. According to what I have read, 
nothing additional is needed.
   
   Thank you for your comments and help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to