
def run():
    import json
    def write_to_bigquery_arch_Fn(Pcolection_arch, destination_table):
        print("step at ", destination_table)
        table_data_type = json.load(open("bigquery_schemas/"+destination_table+".json"))["schema"]
        table_schema = parse_table_schema_from_json(json.dumps(
            table_data_type))
        (Pcolection_arch | "sap schemas change" + destination_table>> beam.ParDo(schemas_change(
                            table_data_type))
                         | "print data" >> beam.ParDo(print_fn())
                         | "Write to BQ " + destination_table >> beam.io.WriteToBigQuery(
            TABLE_PREFIX+destination_table,
            schema=table_schema,
            create_disposition='CREATE_IF_NEEDED',
            write_disposition='WRITE_APPEND'))
    options = PipelineOptions()
    with beam.Pipeline(options=options) as pipeline:
        query_side_input = (pipeline | 'create cache' >> beam.Create(['1'])
                | "cache for item">> beam.ParDo(cache_for_bigquery(
            'bi-prod-000001',
            str_query_store
        )) )
        bor_data_source = (pipeline | "Read from BOR system" >> beam.io.gcp.pubsub.ReadFromPubSub(subscription=BOR_SUBCRIPTION))
        itg_data_source = (pipeline | "Read from ITG system" >> beam.io.gcp.pubsub.ReadFromPubSub(subscription=ITG_SUBCRIPTION))
        key_modification_result = ((bor_data_source, itg_data_source)
                           | "Gather ">> beam.Flatten()
                           |"Core mapper" >> beam.ParDo(arch_mapper_fn())
                           | "Transaction Key Manual Modifier" >>
                           beam.ParDo(manual_modifier_transaction_key()).with_outputs(
                               'final_transaction',
                               'delelted_transaction'
                           ))
        
        property_modification_result = (key_modification_result.final_transaction
                                        | "Item code mapper" >> beam.ParDo(
                                            item_code_mapper_fn(),
                                            pvalue.AsList(query_side_input)
                                            )
                                        | "Property Modifier" >>beam.ParDo(
                                            manual_modifier_transaction_property()).with_outputs(
                                            'final_transaction',
                                            'delelted_transaction')
                                        )
        enriched_transactions = (property_modification_result.final_transaction
                                 | "promotion decorator" >> beam.ParDo(
                                            promotion_type_decorator_fn())
                                 | "Validate Rule" >> beam.ParDo(
                                            validation_rule_decorator_fn())
                                 | "Gfk Type decorator" >> beam.ParDo(
                                            gfk_type_decorator_fn()
                                 ))
        final_transactions = ((enriched_transactions,
                                    property_modification_result.delelted_transaction,
                                    key_modification_result.delelted_transaction)
                                    | beam.Flatten()
                                    | "END RESULT" >> beam.ParDo(printfn()))
        write_to_bigquery_arch_Fn(
                final_transactions,
                'sales_transaction'
        )


if __name__ == "__main__":
    print("START CORE TRANSACTION PIPELINES")
    run()
