[
https://issues.apache.org/jira/browse/BEAM-11936?focusedWorklogId=752514&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-752514
]
ASF GitHub Bot logged work on BEAM-11936:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Apr/22 21:23
Start Date: 04/Apr/22 21:23
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on code in PR #17257:
URL: https://github.com/apache/beam/pull/17257#discussion_r842148641
##########
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java:
##########
@@ -1025,50 +1020,44 @@ private void checkArguments(PCollection<T> input) {
.discardingFiredPanes());
int shards = (getShardsNumber() > 0) ? getShardsNumber() :
DEFAULT_STREAMING_SHARDS_NUMBER;
- PCollection files = writeFiles(inputInGlobalWindow, stagingBucketDir,
shards);
+ PCollection<String> files = writeFiles(inputInGlobalWindow,
stagingBucketDir, shards);
/* Ensuring that files will be ingested after flush time */
files =
- (PCollection)
- files.apply(
- "Apply User Trigger",
- Window.<T>into(new GlobalWindows())
- .triggering(
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(getFlushTimeLimit())))
- .discardingFiredPanes());
- files =
- (PCollection)
- files.apply(
- "Create list of files for loading via SnowPipe",
- Combine.globally(new Concatenate()).withoutDefaults());
+ files.apply(
+ "Apply User Trigger",
+ Window.<String>into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(getFlushTimeLimit())))
+ .discardingFiredPanes());
+ PCollection<List<String>> files_concatenated =
+ files.apply(
+ "Create list of files for loading via SnowPipe",
+ Combine.globally(new Concatenate()).withoutDefaults());
- return (PCollection)
- files.apply("Stream files to table", streamToTable(snowflakeService,
stagingBucketDir));
+ return files_concatenated.apply(
+ "Stream files to table", streamToTable(snowflakeServices,
stagingBucketDir));
}
- private PCollection writeBatch(PCollection input, ValueProvider<String>
stagingBucketDir) {
- SnowflakeService snowflakeService =
- getSnowflakeService() != null ? getSnowflakeService() : new
SnowflakeBatchServiceImpl();
+ private PCollection<Void> writeBatch(
+ PCollection<T> input, ValueProvider<String> stagingBucketDir) {
+ SnowflakeServices snowflakeServices =
+ getSnowflakeServices() != null ? getSnowflakeServices() : new
SnowflakeServicesImpl();
PCollection<String> files = writeBatchFiles(input, stagingBucketDir);
// Combining PCollection of files as a side input into one list of files
ListCoder<String> coder = ListCoder.of(StringUtf8Coder.of());
- files =
- (PCollection)
- files
- .getPipeline()
- .apply(
- Reify.viewInGlobalWindow(
- (PCollectionView) files.apply(View.asList()),
coder));
+ PCollection<List<String>> reified_files =
Review Comment:
checkstyle is unhappy with this variable name (that's the Java PreCommit
error)
```suggestion
PCollection<List<String>> reifiedFiles =
```
##########
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java:
##########
@@ -1025,50 +1020,44 @@ private void checkArguments(PCollection<T> input) {
.discardingFiredPanes());
int shards = (getShardsNumber() > 0) ? getShardsNumber() :
DEFAULT_STREAMING_SHARDS_NUMBER;
- PCollection files = writeFiles(inputInGlobalWindow, stagingBucketDir,
shards);
+ PCollection<String> files = writeFiles(inputInGlobalWindow,
stagingBucketDir, shards);
/* Ensuring that files will be ingested after flush time */
files =
- (PCollection)
- files.apply(
- "Apply User Trigger",
- Window.<T>into(new GlobalWindows())
- .triggering(
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(getFlushTimeLimit())))
- .discardingFiredPanes());
- files =
- (PCollection)
- files.apply(
- "Create list of files for loading via SnowPipe",
- Combine.globally(new Concatenate()).withoutDefaults());
+ files.apply(
+ "Apply User Trigger",
+ Window.<String>into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(getFlushTimeLimit())))
+ .discardingFiredPanes());
+ PCollection<List<String>> files_concatenated =
Review Comment:
similarly here
##########
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServices.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.services;
+
+import java.io.Serializable;
+
+/** Interface which defines common methods for interacting with Snowflake. */
+public interface SnowflakeServices extends Serializable {
Review Comment:
Is there any chance the `SnowflakeService` -> `SnowflakeServices` rename
will be a breaking change for our users? They are public after all
Issue Time Tracking
-------------------
Worklog Id: (was: 752514)
Time Spent: 66h 50m (was: 66h 40m)
> Fix errorprone ignored warnings
> -------------------------------
>
> Key: BEAM-11936
> URL: https://issues.apache.org/jira/browse/BEAM-11936
> Project: Beam
> Issue Type: Task
> Components: build-system, runner-core, sdk-java-core,
> sdk-java-harness
> Reporter: Brian Hulette
> Priority: P3
> Time Spent: 66h 50m
> Remaining Estimate: 0h
>
> Upgrading to errorprone 2.3.4 (https://github.com/apache/beam/pull/14148)
> required ignoring a lot of new warnings. We should fix the offending code and
> re-enable these warnings.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)