[ 
https://issues.apache.org/jira/browse/BEAM-9894?focusedWorklogId=438719&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-438719
 ]

ASF GitHub Bot logged work on BEAM-9894:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/May/20 12:46
            Start Date: 29/May/20 12:46
    Worklog Time Spent: 10m 
      Work Description: purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432458477



##########
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition 
writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService 
snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration 
config) {
+      return withDataSourceProviderFn(new 
DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in 
runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake 
and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we 
have {@link
+     * SnowflakeServiceImpl} with corresponding {@link 
FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", 
loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");

Review comment:
       Yes, it can be `nullable` for Write method. Thanks for spotting this :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 438719)
    Time Spent: 4h 50m  (was: 4h 40m)

> Add batch SnowflakeIO.Write to Java SDK
> ---------------------------------------
>
>                 Key: BEAM-9894
>                 URL: https://issues.apache.org/jira/browse/BEAM-9894
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-ideas
>            Reporter: Dariusz Aniszewski
>            Assignee: Pawel Urbanowicz
>            Priority: P2
>          Time Spent: 4h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to