[ 
https://issues.apache.org/jira/browse/BEAM-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MOHIL updated BEAM-10339:
-------------------------
    Description: 
I am using the BEAM java 2.19.0 version on google dataflow.
  
 Default machine type which is n1-standard-4.
 Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks it up 
based on number of cores available)
 I recently added 3-4 new PTransformations. to an existing pipeline where I 
read data from BQ for a certain timestamp and create 
PCollectionView<Map<Key,value>> to be used as side input in other PTransforms.
  
 I already had similar PTransforms earlier and pipeline was running fine so 
far. Moment I added new ones, pipeline failed to start with {color:#ff0000}PC: 
@ 0x7f7490e08602 (unknown) raise and Check failure stack trace: *** 
exceptions.{color}
  
 Please find attached screenshots for various logs that I have collected via 
worker logs and error reporting. Also find attached worker logs (for certain 
time interval) that I downloaded
  
 *Note: When I enabled streamingEngine (keep machine type as default which is 
n1-standard-2 for streaming engine), issue was gone and pipeline started 
successfully.*
  
 Here is a code snippet which can give idea about kind of transformations and 
window that I am using:
  
 1: Code listens for some trigger on pubsub topic:

        /**

     * Read From PubSub for topic ANALYTICS_UPDATE and create 
PCollection<String> indicating main pipeline to reload

     * relevant DataAnalyticsData from BQ table

     */

    static class MonitorPubSubForDailyAnalyticsDataStatus extends 
PTransform<PBegin, PCollection<POJORepresentingJobCompleteInfo>> {

        private final String subscriptionName;

        private final String jobProject;

        MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, 
String jobProject)

{             this.subscriptionName = subscriptionName;             
this.jobProject = jobProject;         }

        @Override

        public PCollection<POJORepresentingJobCompleteInfo> expand(PBegin 
input) {

            return input.getPipeline()

                .apply("Read_PubSub_Messages", 
PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))

                .apply("Applying_Windowing", Window.<PubsubMessage>into(new 
GlobalWindows())

                    
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

                    .discardingFiredPanes())

                .apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage, 
POJORepresentingJobCompleteInfo>() {

                    @ProcessElement

                    public void processElement(@Element PubsubMessage input, 
OutputReceiver<POJORepresentingJobCompleteInfo> out)

{                         /*** Read and CReate ***/                         
out.output(POJORepresentingJobCompleteInfo);                                    
         }

                }));

        }

    }

2: Get Latest Updated and Reload new Updates from various BQ tables using 
google cloud bigquery library 
([https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries])

    PCollection<POJORepresentingJobCompleteInfo> analyticsDataStatusUpdates = 
p.apply("Get_Analytics_Data_Status_Updates_pubsub",

            new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, 
jobProject));

3. Create various PCollectionViews to be used as side input for decorating 
stream of logs coming from Kafka (will be shown later)

   PCollectionView<Map<Stats1Key, Stats1>> Stats1View =

            analyticsDataStatusUpdates

                 .apply("Reload_Stats1_FromBQ", new ReadStats1())

                 .apply("View_Stats1", View.asSingleton());

   PCollectionView<Map<Stats2Key, Stats2>> Stats2View =

            analyticsDataStatusUpdates

                 .apply("Reload_Stats1_FromBQ", new ReadStats2())

                 .apply("View_Stats1", View.asSingleton());

   .

   .

   .

   . and so one

4. An example of code where we read stats from BQ i.e. in ReadStats1(), 
ReadStats2() and so on

   class ReadStatsS1 extends 
PTransform<PCollection<POJORepresentingJobCompleteInfo>, 
PCollection<Map<Stats1Key, Stats1>>> {

  

    @Override

    public PCollection<Map<Stats1Key, Stats1>> 
expand(PCollection<POJORepresentingJobCompleteInfo> input)

{         return input             .apply("Read_From_BigQuery", ParDo.of(new 
BigQueryRread()))             .apply("Applying_Windowing", 
Window.<Map<Stats1Key, Stats1>>into(new GlobalWindows())                 
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))               
  .discardingFiredPanes());     }

    private class BigQueryRread extends DoFn<POJORepresentingJobCompleteInfo, 
Map<Stats1Key, Stats1>> {

        @ProcessElement

        public void process(@Element POJORepresentingJobCompleteInfo input, 
ProcessContext c) {

            Map<Stats1Key, Stats1> resultMap = new HashMap<>();

           

            try {

                BigQuery bigQueryClient = 
BigQueryOptions.getDefaultInstance().getService();

                String sqlQuery = getSqlQuery(input); ///some method to return 
desired sql query based on info present in input

                QueryJobConfiguration queryJobConfiguration =

                    
QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build();

                // Create a job ID so that we can safely retry.

                JobId jobId = JobId.of(UUID.randomUUID().toString());

                Job queryJob = 
bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());

                // Wait for the query to complete.

                queryJob = queryJob.waitFor();

                if (queryJob == null)

{                     logger.p1Error("Big Query Job no longer exists");         
        }

else if (queryJob.getStatus().getError() != null) {

                    // You can also look at 
queryJob.getStatus().getExecutionErrors() for all

                    // errors, not just the latest one.

                    logger.p1Error("Big Query job returned error: {}", 
queryJob.getStatus().getError().toString());

                } else {

                    //successful case

                    logger.info("Parsing results executed by BigQuery");

                    // Get the results.

                    TableResult result = queryJob.getQueryResults();

                    if (null == result || 
!result.iterateAll().iterator().hasNext()) {

                        logger.info("No data found for query: {}", sqlQuery);

                    } else {

                        // Print all pages of the results.

                        for (FieldValueList row : result.iterateAll())

{                                 /*** Parse row and create Stats1Key and Stats 
from that row/                                 resultMap.put(key, stats);       
                      }

                        }

                    }

                }

            } catch (Exception ex)

{                 logger.p1Error("Error in executing sql query against Big 
Query", ex);             }

            logger.info("Emitting map of size: {}", resultMap.size());

            c.output(resultMap);

        }

    }

    As I mentioned before all classes : ReadStats1(), ReadStats2() etc follow 
above code design

5. Using KafkaIO we read continuous stream of data from kafka

    PCollection<POJO> Logs =

        p

            .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()

                .withBootstrapServers(String.join(",", 
bootstrapServerToConnectTo))

                .withTopic("topic")

                .withKeyDeserializer(StringDeserializer.class)

                .withValueDeserializer(ByteArrayDeserializer.class)

                .withConsumerConfigUpdates(kafkaConsumerProperties)

                .withConsumerFactoryFn(consumerFactoryObj)

                .commitOffsetsInFinalize())

            .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, 
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))

                
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))

                .withAllowedLateness(Duration.standardDays(1))

                .discardingFiredPanes())

            .apply("Convert_KafkaRecord_To_PCollection<POJO>",

                ParDo.of(new ParseLogs());

6. Take these logs and apply another Transform providing aforementioned BQ 
reads as side input i.e. something like this

    Logs.apply("decorate", new Decorate().withSideInput(Stats1View, 
Stats2View...);

*Please Note: I tried commenting out code where I added side input to the above 
transform and still landed up in the same crash. So Issue is definitely in 
adding*

*more than a certain number of PCollectionView transforms. I already had 3-4 
such transforms and it was working fine. Yesterday I added a few more and 
started seeing crashes.*

If I enable just one of the newly added PCollectionView transforms (keeping old 
3-4 intact), then everything works fine. Moment I enable another new transform, 
a crash happens.

 

*It would be good to know how streamingEngine solved the problem.*

 
  
  
  

  was:
I am using the BEAM java 2.19.0 version on google dataflow.
 
Default machine type which is n1-standard-4.
Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks it up 
based on number of cores available)
I recently added 3-4 new PTransformations. to an existing pipeline where I read 
data from BQ for a certain timestamp and create PCollectionView<Map<Key,value>> 
to be used as side input in other PTransforms.
 
I already had similar PTransforms earlier and pipeline was running fine so far. 
Moment I added new ones, pipeline failed to start with {color:#FF0000}PC: @ 
0x7f7490e08602 (unknown) raise and Check failure stack trace: *** 
exceptions.{color}
 
Please find attached screenshots for various logs that I have collected via 
worker logs and error reporting.
 
*Note: When I enabled streamingEngine (keep machine type as default which is 
n1-standard-2 for streaming engine), issue was gone and pipeline started 
successfully.*
 
Here is a code snippet which can give idea about kind of transformations and 
window that I am using:
 
1: Code listens for some trigger on pubsub topic:

        /**

     * Read From PubSub for topic ANALYTICS_UPDATE and create 
PCollection<String> indicating main pipeline to reload

     * relevant DataAnalyticsData from BQ table

     */

    static class MonitorPubSubForDailyAnalyticsDataStatus extends 
PTransform<PBegin, PCollection<POJORepresentingJobCompleteInfo>> {

        private final String subscriptionName;

        private final String jobProject;

        MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, 
String jobProject) {

            this.subscriptionName = subscriptionName;

            this.jobProject = jobProject;

        }

        @Override

        public PCollection<POJORepresentingJobCompleteInfo> expand(PBegin 
input) {

            return input.getPipeline()

                .apply("Read_PubSub_Messages", 
PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))

                .apply("Applying_Windowing", Window.<PubsubMessage>into(new 
GlobalWindows())

                    
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

                    .discardingFiredPanes())

                .apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage, 
POJORepresentingJobCompleteInfo>() {

                    @ProcessElement

                    public void processElement(@Element PubsubMessage input, 
OutputReceiver<POJORepresentingJobCompleteInfo> out) {

                        /*** Read and CReate ***/

                        out.output(POJORepresentingJobCompleteInfo);

                       

                    }

                }));

        }

    }

2: Get Latest Updated and Reload new Updates from various BQ tables using 
google cloud bigquery library 
([https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries])

    PCollection<POJORepresentingJobCompleteInfo> analyticsDataStatusUpdates = 
p.apply("Get_Analytics_Data_Status_Updates_pubsub",

            new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, 
jobProject));

3. Create various PCollectionViews to be used as side input for decorating 
stream of logs coming from Kafka (will be shown later)

   PCollectionView<Map<Stats1Key, Stats1>> Stats1View =

            analyticsDataStatusUpdates

                 .apply("Reload_Stats1_FromBQ", new ReadStats1())

                 .apply("View_Stats1", View.asSingleton());

   PCollectionView<Map<Stats2Key, Stats2>> Stats2View =

            analyticsDataStatusUpdates

                 .apply("Reload_Stats1_FromBQ", new ReadStats2())

                 .apply("View_Stats1", View.asSingleton());

   .

   .

   .

   . and so one

4. An example of code where we read stats from BQ i.e. in ReadStats1(), 
ReadStats2() and so on

   class ReadStatsS1 extends 
PTransform<PCollection<POJORepresentingJobCompleteInfo>, 
PCollection<Map<Stats1Key, Stats1>>> {

  

    @Override

    public PCollection<Map<Stats1Key, Stats1>> 
expand(PCollection<POJORepresentingJobCompleteInfo> input) {

        return input

            .apply("Read_From_BigQuery", ParDo.of(new BigQueryRread()))

            .apply("Applying_Windowing", Window.<Map<Stats1Key, 
Stats1>>into(new GlobalWindows())

                
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

                .discardingFiredPanes());

    }

    private class BigQueryRread extends DoFn<POJORepresentingJobCompleteInfo, 
Map<Stats1Key, Stats1>> {

        @ProcessElement

        public void process(@Element POJORepresentingJobCompleteInfo input, 
ProcessContext c) {

            Map<Stats1Key, Stats1> resultMap = new HashMap<>();

           

            try {

                BigQuery bigQueryClient = 
BigQueryOptions.getDefaultInstance().getService();

                String sqlQuery = getSqlQuery(input); ///some method to return 
desired sql query based on info present in input

                QueryJobConfiguration queryJobConfiguration =

                    
QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build();

                // Create a job ID so that we can safely retry.

                JobId jobId = JobId.of(UUID.randomUUID().toString());

                Job queryJob = 
bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());

                // Wait for the query to complete.

                queryJob = queryJob.waitFor();

                if (queryJob == null) {

                    logger.p1Error("Big Query Job no longer exists");

                } else if (queryJob.getStatus().getError() != null) {

                    // You can also look at 
queryJob.getStatus().getExecutionErrors() for all

                    // errors, not just the latest one.

                    logger.p1Error("Big Query job returned error: {}", 
queryJob.getStatus().getError().toString());

                } else {

                    //successful case

                    logger.info("Parsing results executed by BigQuery");

                    // Get the results.

                    TableResult result = queryJob.getQueryResults();

                    if (null == result || 
!result.iterateAll().iterator().hasNext()) {

                        logger.info("No data found for query: {}", sqlQuery);

                    } else {

                        // Print all pages of the results.

                        for (FieldValueList row : result.iterateAll()) {

                                /*** Parse row and create Stats1Key and Stats 
from that row/

                                resultMap.put(key, stats);

                            }

                        }

                    }

                }

            } catch (Exception ex) {

                logger.p1Error("Error in executing sql query against Big 
Query", ex);

            }

            logger.info("Emitting map of size: {}", resultMap.size());

            c.output(resultMap);

        }

    }

    As I mentioned before all classes : ReadStats1(), ReadStats2() etc follow 
above code design

5. Using KafkaIO we read continuous stream of data from kafka

    PCollection<POJO> Logs =

        p

            .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()

                .withBootstrapServers(String.join(",", 
bootstrapServerToConnectTo))

                .withTopic("topic")

                .withKeyDeserializer(StringDeserializer.class)

                .withValueDeserializer(ByteArrayDeserializer.class)

                .withConsumerConfigUpdates(kafkaConsumerProperties)

                .withConsumerFactoryFn(consumerFactoryObj)

                .commitOffsetsInFinalize())

            .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, 
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))

                
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))

                .withAllowedLateness(Duration.standardDays(1))

                .discardingFiredPanes())

            .apply("Convert_KafkaRecord_To_PCollection<POJO>",

                ParDo.of(new ParseLogs());

6. Take these logs and apply another Transform providing aforementioned BQ 
reads as side input i.e. something like this

    Logs.apply("decorate", new Decorate().withSideInput(Stats1View, 
Stats2View...);

*Please Note: I tried commenting out code where I added side input to the above 
transform and still landed up in the same crash. So Issue is definitely in 
adding*

*more than a certain number of PCollectionView transforms. I already had 3-4 
such transforms and it was working fine. Yesterday I added a few more and 
started seeing crashes.*

If I enable just one of the newly added PCollectionView transforms (keeping old 
3-4 intact), then everything works fine. Moment I enable another new transform, 
a crash happens.

 

*It would be good to know how streamingEngine solved the problem.*

 
 
 
 


> Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: 
> ***
> -------------------------------------------------------------------------------
>
>                 Key: BEAM-10339
>                 URL: https://issues.apache.org/jira/browse/BEAM-10339
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow, sdk-java-core
>    Affects Versions: 2.19.0
>            Reporter: MOHIL
>            Priority: P1
>         Attachments: 
> dataflow_step_job_id_2020-06-25_01_11_47-8571841088706513966__logs__2020-06-25T01-15.json,
>  logging-errSyncingPod.png, logging-result1.png, logging-result2.png, 
> logging-result3.png, logging-result4.png, logging-result5.png, 
> logging-result6.png, nonworking-workerlogs.png, screenshot1.png, 
> screenshot2.png, working-workerlogs.png
>
>
> I am using the BEAM java 2.19.0 version on google dataflow.
>   
>  Default machine type which is n1-standard-4.
>  Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks it up 
> based on number of cores available)
>  I recently added 3-4 new PTransformations. to an existing pipeline where I 
> read data from BQ for a certain timestamp and create 
> PCollectionView<Map<Key,value>> to be used as side input in other PTransforms.
>   
>  I already had similar PTransforms earlier and pipeline was running fine so 
> far. Moment I added new ones, pipeline failed to start with 
> {color:#ff0000}PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack 
> trace: *** exceptions.{color}
>   
>  Please find attached screenshots for various logs that I have collected via 
> worker logs and error reporting. Also find attached worker logs (for certain 
> time interval) that I downloaded
>   
>  *Note: When I enabled streamingEngine (keep machine type as default which is 
> n1-standard-2 for streaming engine), issue was gone and pipeline started 
> successfully.*
>   
>  Here is a code snippet which can give idea about kind of transformations and 
> window that I am using:
>   
>  1: Code listens for some trigger on pubsub topic:
>         /**
>      * Read From PubSub for topic ANALYTICS_UPDATE and create 
> PCollection<String> indicating main pipeline to reload
>      * relevant DataAnalyticsData from BQ table
>      */
>     static class MonitorPubSubForDailyAnalyticsDataStatus extends 
> PTransform<PBegin, PCollection<POJORepresentingJobCompleteInfo>> {
>         private final String subscriptionName;
>         private final String jobProject;
>         MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, 
> String jobProject)
> {             this.subscriptionName = subscriptionName;             
> this.jobProject = jobProject;         }
>         @Override
>         public PCollection<POJORepresentingJobCompleteInfo> expand(PBegin 
> input) {
>             return input.getPipeline()
>                 .apply("Read_PubSub_Messages", 
> PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))
>                 .apply("Applying_Windowing", Window.<PubsubMessage>into(new 
> GlobalWindows())
>                     
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>                     .discardingFiredPanes())
>                 .apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage, 
> POJORepresentingJobCompleteInfo>() {
>                     @ProcessElement
>                     public void processElement(@Element PubsubMessage input, 
> OutputReceiver<POJORepresentingJobCompleteInfo> out)
> {                         /*** Read and CReate ***/                         
> out.output(POJORepresentingJobCompleteInfo);                                  
>            }
>                 }));
>         }
>     }
> 2: Get Latest Updated and Reload new Updates from various BQ tables using 
> google cloud bigquery library 
> ([https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries])
>     PCollection<POJORepresentingJobCompleteInfo> analyticsDataStatusUpdates = 
> p.apply("Get_Analytics_Data_Status_Updates_pubsub",
>             new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, 
> jobProject));
> 3. Create various PCollectionViews to be used as side input for decorating 
> stream of logs coming from Kafka (will be shown later)
>    PCollectionView<Map<Stats1Key, Stats1>> Stats1View =
>             analyticsDataStatusUpdates
>                  .apply("Reload_Stats1_FromBQ", new ReadStats1())
>                  .apply("View_Stats1", View.asSingleton());
>    PCollectionView<Map<Stats2Key, Stats2>> Stats2View =
>             analyticsDataStatusUpdates
>                  .apply("Reload_Stats1_FromBQ", new ReadStats2())
>                  .apply("View_Stats1", View.asSingleton());
>    .
>    .
>    .
>    . and so one
> 4. An example of code where we read stats from BQ i.e. in ReadStats1(), 
> ReadStats2() and so on
>    class ReadStatsS1 extends 
> PTransform<PCollection<POJORepresentingJobCompleteInfo>, 
> PCollection<Map<Stats1Key, Stats1>>> {
>   
>     @Override
>     public PCollection<Map<Stats1Key, Stats1>> 
> expand(PCollection<POJORepresentingJobCompleteInfo> input)
> {         return input             .apply("Read_From_BigQuery", ParDo.of(new 
> BigQueryRread()))             .apply("Applying_Windowing", 
> Window.<Map<Stats1Key, Stats1>>into(new GlobalWindows())                 
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))             
>     .discardingFiredPanes());     }
>     private class BigQueryRread extends DoFn<POJORepresentingJobCompleteInfo, 
> Map<Stats1Key, Stats1>> {
>         @ProcessElement
>         public void process(@Element POJORepresentingJobCompleteInfo input, 
> ProcessContext c) {
>             Map<Stats1Key, Stats1> resultMap = new HashMap<>();
>            
>             try {
>                 BigQuery bigQueryClient = 
> BigQueryOptions.getDefaultInstance().getService();
>                 String sqlQuery = getSqlQuery(input); ///some method to 
> return desired sql query based on info present in input
>                 QueryJobConfiguration queryJobConfiguration =
>                     
> QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build();
>                 // Create a job ID so that we can safely retry.
>                 JobId jobId = JobId.of(UUID.randomUUID().toString());
>                 Job queryJob = 
> bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());
>                 // Wait for the query to complete.
>                 queryJob = queryJob.waitFor();
>                 if (queryJob == null)
> {                     logger.p1Error("Big Query Job no longer exists");       
>           }
> else if (queryJob.getStatus().getError() != null) {
>                     // You can also look at 
> queryJob.getStatus().getExecutionErrors() for all
>                     // errors, not just the latest one.
>                     logger.p1Error("Big Query job returned error: {}", 
> queryJob.getStatus().getError().toString());
>                 } else {
>                     //successful case
>                     logger.info("Parsing results executed by BigQuery");
>                     // Get the results.
>                     TableResult result = queryJob.getQueryResults();
>                     if (null == result || 
> !result.iterateAll().iterator().hasNext()) {
>                         logger.info("No data found for query: {}", sqlQuery);
>                     } else {
>                         // Print all pages of the results.
>                         for (FieldValueList row : result.iterateAll())
> {                                 /*** Parse row and create Stats1Key and 
> Stats from that row/                                 resultMap.put(key, 
> stats);                             }
>                         }
>                     }
>                 }
>             } catch (Exception ex)
> {                 logger.p1Error("Error in executing sql query against Big 
> Query", ex);             }
>             logger.info("Emitting map of size: {}", resultMap.size());
>             c.output(resultMap);
>         }
>     }
>     As I mentioned before all classes : ReadStats1(), ReadStats2() etc follow 
> above code design
> 5. Using KafkaIO we read continuous stream of data from kafka
>     PCollection<POJO> Logs =
>         p
>             .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()
>                 .withBootstrapServers(String.join(",", 
> bootstrapServerToConnectTo))
>                 .withTopic("topic")
>                 .withKeyDeserializer(StringDeserializer.class)
>                 .withValueDeserializer(ByteArrayDeserializer.class)
>                 .withConsumerConfigUpdates(kafkaConsumerProperties)
>                 .withConsumerFactoryFn(consumerFactoryObj)
>                 .commitOffsetsInFinalize())
>             .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, 
> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>                 
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))
>                 .withAllowedLateness(Duration.standardDays(1))
>                 .discardingFiredPanes())
>             .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>                 ParDo.of(new ParseLogs());
> 6. Take these logs and apply another Transform providing aforementioned BQ 
> reads as side input i.e. something like this
>     Logs.apply("decorate", new Decorate().withSideInput(Stats1View, 
> Stats2View...);
> *Please Note: I tried commenting out code where I added side input to the 
> above transform and still landed up in the same crash. So Issue is definitely 
> in adding*
> *more than a certain number of PCollectionView transforms. I already had 3-4 
> such transforms and it was working fine. Yesterday I added a few more and 
> started seeing crashes.*
> If I enable just one of the newly added PCollectionView transforms (keeping 
> old 3-4 intact), then everything works fine. Moment I enable another new 
> transform, a crash happens.
>  
> *It would be good to know how streamingEngine solved the problem.*
>  
>   
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to