[ 
https://issues.apache.org/jira/browse/BEAM-6526?focusedWorklogId=192266&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-192266
 ]

ASF GitHub Bot logged work on BEAM-6526:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Jan/19 13:17
            Start Date: 30/Jan/19 13:17
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on pull request #7672: [BEAM-6526] 
Add ReadFiles transform for AvroIO
URL: https://github.com/apache/beam/pull/7672#discussion_r252250756
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
 ##########
 @@ -705,6 +733,93 @@ public CreateSourceFn(Class<T> recordClass, String 
jsonSchema) {
 
   /////////////////////////////////////////////////////////////////////////////
 
+  /** Implementation of {@link #readFiles}. */
+  @AutoValue
+  public abstract static class ReadFiles<T>
+      extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {
+    abstract MatchConfiguration getMatchConfiguration();
+
+    @Nullable
+    abstract Class<T> getRecordClass();
+
+    @Nullable
+    abstract Schema getSchema();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration 
matchConfiguration);
+
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long 
desiredBundleSizeBytes);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract ReadFiles<T> build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public ReadFiles<T> withMatchConfiguration(MatchConfiguration 
configuration) {
+      return toBuilder().setMatchConfiguration(configuration).build();
+    }
+
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public ReadFiles<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) 
{
+      return 
withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Read#watchForNewFiles}. */
+    @Experimental(Kind.SPLITTABLE_DO_FN)
+    public ReadFiles<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
+      return withMatchConfiguration(
+          getMatchConfiguration().continuously(pollInterval, 
terminationCondition));
+    }
+
+    @VisibleForTesting
+    ReadFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return 
toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    /**
+     * If set to true, a Beam schema will be inferred from the AVRO schema. 
This allows the output
+     * to be used by SQL and by the schema-transform library.
+     */
+    @Experimental(Kind.SCHEMAS)
+    public ReadFiles<T> withBeamSchemas(boolean withBeamSchemas) {
 
 Review comment:
   Yes, it makes sense for ParquetIO too. Worth to fill a JIRA IMO. Notice that 
I just created that method to be consistent with the existing `read()` and 
`readAll` signatures.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 192266)
    Time Spent: 20m  (was: 10m)

> Add ReadFiles transform for AvroIO
> ----------------------------------
>
>                 Key: BEAM-6526
>                 URL: https://issues.apache.org/jira/browse/BEAM-6526
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-avro
>            Reporter: Ismaël Mejía
>            Assignee: Ismaël Mejía
>            Priority: Minor
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> AvroIO lacks the `readFiles()` method to make it fully composable with FileIO 
> as other file based IOs do, e.g. TextIO, ParquetIO.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to