[ 
https://issues.apache.org/jira/browse/BEAM-11936?focusedWorklogId=752514&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-752514
 ]

ASF GitHub Bot logged work on BEAM-11936:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Apr/22 21:23
            Start Date: 04/Apr/22 21:23
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on code in PR #17257:
URL: https://github.com/apache/beam/pull/17257#discussion_r842148641


##########
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java:
##########
@@ -1025,50 +1020,44 @@ private void checkArguments(PCollection<T> input) {
                   .discardingFiredPanes());
 
       int shards = (getShardsNumber() > 0) ? getShardsNumber() : 
DEFAULT_STREAMING_SHARDS_NUMBER;
-      PCollection files = writeFiles(inputInGlobalWindow, stagingBucketDir, 
shards);
+      PCollection<String> files = writeFiles(inputInGlobalWindow, 
stagingBucketDir, shards);
 
       /* Ensuring that files will be ingested after flush time */
       files =
-          (PCollection)
-              files.apply(
-                  "Apply User Trigger",
-                  Window.<T>into(new GlobalWindows())
-                      .triggering(
-                          Repeatedly.forever(
-                              AfterProcessingTime.pastFirstElementInPane()
-                                  .plusDelayOf(getFlushTimeLimit())))
-                      .discardingFiredPanes());
-      files =
-          (PCollection)
-              files.apply(
-                  "Create list of files for loading via SnowPipe",
-                  Combine.globally(new Concatenate()).withoutDefaults());
+          files.apply(
+              "Apply User Trigger",
+              Window.<String>into(new GlobalWindows())
+                  .triggering(
+                      Repeatedly.forever(
+                          AfterProcessingTime.pastFirstElementInPane()
+                              .plusDelayOf(getFlushTimeLimit())))
+                  .discardingFiredPanes());
+      PCollection<List<String>> files_concatenated =
+          files.apply(
+              "Create list of files for loading via SnowPipe",
+              Combine.globally(new Concatenate()).withoutDefaults());
 
-      return (PCollection)
-          files.apply("Stream files to table", streamToTable(snowflakeService, 
stagingBucketDir));
+      return files_concatenated.apply(
+          "Stream files to table", streamToTable(snowflakeServices, 
stagingBucketDir));
     }
 
-    private PCollection writeBatch(PCollection input, ValueProvider<String> 
stagingBucketDir) {
-      SnowflakeService snowflakeService =
-          getSnowflakeService() != null ? getSnowflakeService() : new 
SnowflakeBatchServiceImpl();
+    private PCollection<Void> writeBatch(
+        PCollection<T> input, ValueProvider<String> stagingBucketDir) {
+      SnowflakeServices snowflakeServices =
+          getSnowflakeServices() != null ? getSnowflakeServices() : new 
SnowflakeServicesImpl();
 
       PCollection<String> files = writeBatchFiles(input, stagingBucketDir);
 
       // Combining PCollection of files as a side input into one list of files
       ListCoder<String> coder = ListCoder.of(StringUtf8Coder.of());
-      files =
-          (PCollection)
-              files
-                  .getPipeline()
-                  .apply(
-                      Reify.viewInGlobalWindow(
-                          (PCollectionView) files.apply(View.asList()), 
coder));
+      PCollection<List<String>> reified_files =

Review Comment:
   checkstyle is unhappy with this variable name (that's the Java PreCommit 
error)
   ```suggestion
         PCollection<List<String>> reifiedFiles =
   ```



##########
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java:
##########
@@ -1025,50 +1020,44 @@ private void checkArguments(PCollection<T> input) {
                   .discardingFiredPanes());
 
       int shards = (getShardsNumber() > 0) ? getShardsNumber() : 
DEFAULT_STREAMING_SHARDS_NUMBER;
-      PCollection files = writeFiles(inputInGlobalWindow, stagingBucketDir, 
shards);
+      PCollection<String> files = writeFiles(inputInGlobalWindow, 
stagingBucketDir, shards);
 
       /* Ensuring that files will be ingested after flush time */
       files =
-          (PCollection)
-              files.apply(
-                  "Apply User Trigger",
-                  Window.<T>into(new GlobalWindows())
-                      .triggering(
-                          Repeatedly.forever(
-                              AfterProcessingTime.pastFirstElementInPane()
-                                  .plusDelayOf(getFlushTimeLimit())))
-                      .discardingFiredPanes());
-      files =
-          (PCollection)
-              files.apply(
-                  "Create list of files for loading via SnowPipe",
-                  Combine.globally(new Concatenate()).withoutDefaults());
+          files.apply(
+              "Apply User Trigger",
+              Window.<String>into(new GlobalWindows())
+                  .triggering(
+                      Repeatedly.forever(
+                          AfterProcessingTime.pastFirstElementInPane()
+                              .plusDelayOf(getFlushTimeLimit())))
+                  .discardingFiredPanes());
+      PCollection<List<String>> files_concatenated =

Review Comment:
   similarly here



##########
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServices.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.services;
+
+import java.io.Serializable;
+
+/** Interface which defines common methods for interacting with Snowflake. */
+public interface SnowflakeServices extends Serializable {

Review Comment:
   Is there any chance the `SnowflakeService` -> `SnowflakeServices` rename 
will be a breaking change for our users? They are public after all





Issue Time Tracking
-------------------

    Worklog Id:     (was: 752514)
    Time Spent: 66h 50m  (was: 66h 40m)

> Fix errorprone ignored warnings
> -------------------------------
>
>                 Key: BEAM-11936
>                 URL: https://issues.apache.org/jira/browse/BEAM-11936
>             Project: Beam
>          Issue Type: Task
>          Components: build-system, runner-core, sdk-java-core, 
> sdk-java-harness
>            Reporter: Brian Hulette
>            Priority: P3
>          Time Spent: 66h 50m
>  Remaining Estimate: 0h
>
> Upgrading to errorprone 2.3.4 (https://github.com/apache/beam/pull/14148) 
> required ignoring a lot of new warnings. We should fix the offending code and 
> re-enable these warnings.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to