petersg opened a new issue, #4633: URL: https://github.com/apache/iceberg/issues/4633
Hello every one, I am trying to develop this example: https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples/blob/master/python/S3Sink/streaming-file-sink.py But trying to store it in iceberg format. I've try to follow this documentation: https://iceberg.apache.org/docs/latest/aws/ Exactly the Flink point. But i dont know how to execute so many jars into KDA(Kinesis Data Analytics) because you can declare only 1 jar into the runtime, and in this case is the flink-sql-kinesis (to be able to execute flinkSQL). But according to the documentation i need also the flink-iceberg-runtime, aws sdk bundle and url-connection-client. In the example provided by iceberg documentation, it looks like they are doing it locally, but I need to do it from KDA. I've try this: `# -*- coding: utf-8 -*- """ streaming-file-sink.py ~~~~~~~~~~~~~~~~~~~ This module: 1. Creates a table environment 2. Creates a source table from a Kinesis Data Stream 3. Creates a sink table writing to an S3 Bucket 4. Queries from the Source Table and creates a tumbling window over 1 minute to calculate the average PRICE over the window. 5. These tumbling window results are inserted into the Sink table (S3) """ from pyflink.table import EnvironmentSettings, StreamTableEnvironment from pyflink.table.window import Tumble import os import json # 1. Creates a Table Environment env_settings = ( EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() ) table_env = StreamTableEnvironment.create(environment_settings=env_settings) APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" # on kda #Here im trying to load some how this jars, in order to be used. table_env.add_jars("lib/iceberg-flink-runtime-0.12.1.jar") table_env.add_jars("lib/bundle-2.17.173.jar") table_env.add_jars("lib/url-connection-client-2.17.176.jar") #is_local = ( # True if os.environ.get("IS_LOCAL") else False #) # set this env var in your local environment is_local = ( True if os.environ.get("IS_LOCAL") else False ) # set this env var in your local environment if is_local: # only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;) APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" # local CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis_2.12-1.13.2.jar;file:///" + CURRENT_DIR + "/plugins/flink-s3-fs-hadoop-1.9.3.jar", ) table_env.get_config().get_configuration().set_string( "execution.checkpointing.mode", "EXACTLY_ONCE" ) table_env.get_config().get_configuration().set_string( "execution.checkpointing.interval", "1min" ) def get_application_properties(): if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH): with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file: contents = file.read() properties = json.loads(contents) return properties else: print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH)) def property_map(props, property_group_id): for prop in props: if prop["PropertyGroupId"] == property_group_id: return prop["PropertyMap"] def create_glue_catalog(): return """CREATE CATALOG flink_iceberg_test WITH ( 'type'='iceberg', 'warehouse'='s3://path/FlinkS3Ice-test/', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO')""" def create_source_table(table_name, stream_name, region, stream_initpos): return """ CREATE TABLE {0} ( TICKER VARCHAR(6), PRICE DOUBLE, EVENT_TIME TIMESTAMP(3), WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '5' SECOND ) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}', 'scan.stream.initpos' = '{3}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format( table_name, stream_name, region, stream_initpos ) # Create the iceberg table def create_sink_table_2(table_name,bucket_name): # Catalog Name + DatabaseName + TableName return """ CREATE TABLE flink_iceberg_test.db_flink.{0} ( TICKER VARCHAR(6), PRICE DOUBLE, EVENT_TIME TIMESTAMP(3), WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '5' SECOND ) PARTITIONED BY (TICKER) WITH ('type'='iceberg', 'write.format.default'='parquet', 'write.object-storage.enabled'=true, 'write.data.path'='s3://{1}/FlinkS3Ice-test/data/') """.format( table_name, bucket_name) def perform_tumbling_window_aggregation(input_table_name): # use SQL Table in the Table API input_table = table_env.from_path(input_table_name) tumbling_window_table = ( input_table.window( Tumble.over("1.minute").on("EVENT_TIME").alias("one_minute_window") ) .group_by("TICKER, one_minute_window") .select("TICKER, PRICE.avg as PRICE, one_minute_window.end as EVENT_TIME") ) return tumbling_window_table def main(): # Application Property Keys input_property_group_key = "consumer.config.0" sink_property_group_key = "sink.config.0" input_stream_key = "input.stream.name" input_region_key = "aws.region" input_starting_position_key = "flink.stream.initpos" output_sink_key = "output.bucket.name" # tables input_table_name = "input_table" output_table_name = "output_table" # get application properties props = get_application_properties() input_property_map = property_map(props, input_property_group_key) output_property_map = property_map(props, sink_property_group_key) input_stream = input_property_map[input_stream_key] input_region = input_property_map[input_region_key] stream_initpos = input_property_map[input_starting_position_key] output_bucket_name = output_property_map[output_sink_key] # 0. Create glue catalog table_env.exceute_sql( create_glue_catalog() ) # 2. Creates a source table from a Kinesis Data Stream table_env.execute_sql( create_source_table( input_table_name, input_stream, input_region, stream_initpos ) ) # 3. Creates a sink table writing to an S3 Bucket create_sink = create_sink_table_2( output_table_name, output_bucket_name ) table_env.execute_sql(create_sink) # 4. Queries from the Source Table and creates a tumbling window over 1 minute to calculate the average PRICE # over the window. tumbling_window_table = perform_tumbling_window_aggregation(input_table_name) table_env.create_temporary_view("tumbling_window_table", tumbling_window_table) # 5. These tumbling windows are inserted into the sink table (S3) table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, "tumbling_window_table")) print(table_result.get_job_client().get_job_status()) if __name__ == "__main__": main() ` I've recreate the example to store the data in a S3sink, and works fine. But this approach needs glue catalog for the iceberg, and it seems that is not creating that catalog. This code is supposed to create the resources such as the table and the catalogue in iceberg format. According to what I have read, nothing additional is needed. Thank you for your comments and help. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
