sfc-gh-kbregula commented on code in PR #17257:
URL: https://github.com/apache/beam/pull/17257#discussion_r842188447


##########
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java:
##########
@@ -1025,50 +1020,44 @@ private void checkArguments(PCollection<T> input) {
                   .discardingFiredPanes());
 
       int shards = (getShardsNumber() > 0) ? getShardsNumber() : 
DEFAULT_STREAMING_SHARDS_NUMBER;
-      PCollection files = writeFiles(inputInGlobalWindow, stagingBucketDir, 
shards);
+      PCollection<String> files = writeFiles(inputInGlobalWindow, 
stagingBucketDir, shards);
 
       /* Ensuring that files will be ingested after flush time */
       files =
-          (PCollection)
-              files.apply(
-                  "Apply User Trigger",
-                  Window.<T>into(new GlobalWindows())
-                      .triggering(
-                          Repeatedly.forever(
-                              AfterProcessingTime.pastFirstElementInPane()
-                                  .plusDelayOf(getFlushTimeLimit())))
-                      .discardingFiredPanes());
-      files =
-          (PCollection)
-              files.apply(
-                  "Create list of files for loading via SnowPipe",
-                  Combine.globally(new Concatenate()).withoutDefaults());
+          files.apply(
+              "Apply User Trigger",
+              Window.<String>into(new GlobalWindows())
+                  .triggering(
+                      Repeatedly.forever(
+                          AfterProcessingTime.pastFirstElementInPane()
+                              .plusDelayOf(getFlushTimeLimit())))
+                  .discardingFiredPanes());
+      PCollection<List<String>> files_concatenated =
+          files.apply(
+              "Create list of files for loading via SnowPipe",
+              Combine.globally(new Concatenate()).withoutDefaults());
 
-      return (PCollection)
-          files.apply("Stream files to table", streamToTable(snowflakeService, 
stagingBucketDir));
+      return files_concatenated.apply(
+          "Stream files to table", streamToTable(snowflakeServices, 
stagingBucketDir));
     }
 
-    private PCollection writeBatch(PCollection input, ValueProvider<String> 
stagingBucketDir) {
-      SnowflakeService snowflakeService =
-          getSnowflakeService() != null ? getSnowflakeService() : new 
SnowflakeBatchServiceImpl();
+    private PCollection<Void> writeBatch(
+        PCollection<T> input, ValueProvider<String> stagingBucketDir) {
+      SnowflakeServices snowflakeServices =
+          getSnowflakeServices() != null ? getSnowflakeServices() : new 
SnowflakeServicesImpl();
 
       PCollection<String> files = writeBatchFiles(input, stagingBucketDir);
 
       // Combining PCollection of files as a side input into one list of files
       ListCoder<String> coder = ListCoder.of(StringUtf8Coder.of());
-      files =
-          (PCollection)
-              files
-                  .getPipeline()
-                  .apply(
-                      Reify.viewInGlobalWindow(
-                          (PCollectionView) files.apply(View.asList()), 
coder));
+      PCollection<List<String>> reified_files =

Review Comment:
   I see. Thanks for paying attention. I will never find a set of all commands 
that will allow me to check all stylistically changes to the code locally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to