[ 
https://issues.apache.org/jira/browse/BEAM-671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated BEAM-671:
------------------------------
    Description: 
Today, almost all IOs {{Read}} {{PTransform}}, when using an 
{{UnboundedSource}}, do the following in the {{apply()}}:
{code:java}
      org.apache.beam.sdk.io.Read.Unbounded<byte[]> unbounded =
          org.apache.beam.sdk.io.Read.from(getSource());

      PTransform<PBegin, PCollection<byte[]>> transform = unbounded;

      if (maxNumRecords() < Long.MAX_VALUE || maxReadTime() != null) {
        transform = 
unbounded.withMaxReadTime(maxReadTime()).withMaxNumRecords(maxNumRecords());
      }

      return input.getPipeline().apply(transform);
{code}
To avoid to duplicate this code bunch of time, it would make sense to do it by 
default in {{org.apache.beam.sdk.io.Read}}.

  was:
Today, almost all IOs {{Read}} {{PTransform}}, when using an 
{{UnboundedSource}}, do the following in the {{apply()}}:

{code}
      org.apache.beam.sdk.io.Read.Unbounded<byte[]> unbounded =
          org.apache.beam.sdk.io.Read.from(getSource());

      PTransform<PBegin, PCollection<byte[]>> transform = unbounded;

      if (maxNumRecords != Long.MAX_VALUE) {
        transform = unbounded.withMaxNumRecords(maxNumRecords);
      } else if (maxReadTime != null) {
        transform = unbounded.withMaxReadTime(maxReadTime);
      }

      return input.getPipeline().apply(transform);
{code}

To avoid to duplicate this code bunch of time, it would make sense to do it by 
default in {{org.apache.beam.sdk.io.Read}}.


> Update Read PTransform to implicitly use maxNumRecords and maxReadTime
> ----------------------------------------------------------------------
>
>                 Key: BEAM-671
>                 URL: https://issues.apache.org/jira/browse/BEAM-671
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>
> Today, almost all IOs {{Read}} {{PTransform}}, when using an 
> {{UnboundedSource}}, do the following in the {{apply()}}:
> {code:java}
>       org.apache.beam.sdk.io.Read.Unbounded<byte[]> unbounded =
>           org.apache.beam.sdk.io.Read.from(getSource());
>       PTransform<PBegin, PCollection<byte[]>> transform = unbounded;
>       if (maxNumRecords() < Long.MAX_VALUE || maxReadTime() != null) {
>         transform = 
> unbounded.withMaxReadTime(maxReadTime()).withMaxNumRecords(maxNumRecords());
>       }
>       return input.getPipeline().apply(transform);
> {code}
> To avoid to duplicate this code bunch of time, it would make sense to do it 
> by default in {{org.apache.beam.sdk.io.Read}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to