[
https://issues.apache.org/jira/browse/BEAM-11936?focusedWorklogId=752533&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-752533
]
ASF GitHub Bot logged work on BEAM-11936:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Apr/22 22:32
Start Date: 04/Apr/22 22:32
Worklog Time Spent: 10m
Work Description: sfc-gh-kbregula commented on code in PR #17257:
URL: https://github.com/apache/beam/pull/17257#discussion_r842188447
##########
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java:
##########
@@ -1025,50 +1020,44 @@ private void checkArguments(PCollection<T> input) {
.discardingFiredPanes());
int shards = (getShardsNumber() > 0) ? getShardsNumber() :
DEFAULT_STREAMING_SHARDS_NUMBER;
- PCollection files = writeFiles(inputInGlobalWindow, stagingBucketDir,
shards);
+ PCollection<String> files = writeFiles(inputInGlobalWindow,
stagingBucketDir, shards);
/* Ensuring that files will be ingested after flush time */
files =
- (PCollection)
- files.apply(
- "Apply User Trigger",
- Window.<T>into(new GlobalWindows())
- .triggering(
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(getFlushTimeLimit())))
- .discardingFiredPanes());
- files =
- (PCollection)
- files.apply(
- "Create list of files for loading via SnowPipe",
- Combine.globally(new Concatenate()).withoutDefaults());
+ files.apply(
+ "Apply User Trigger",
+ Window.<String>into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(getFlushTimeLimit())))
+ .discardingFiredPanes());
+ PCollection<List<String>> files_concatenated =
+ files.apply(
+ "Create list of files for loading via SnowPipe",
+ Combine.globally(new Concatenate()).withoutDefaults());
- return (PCollection)
- files.apply("Stream files to table", streamToTable(snowflakeService,
stagingBucketDir));
+ return files_concatenated.apply(
+ "Stream files to table", streamToTable(snowflakeServices,
stagingBucketDir));
}
- private PCollection writeBatch(PCollection input, ValueProvider<String>
stagingBucketDir) {
- SnowflakeService snowflakeService =
- getSnowflakeService() != null ? getSnowflakeService() : new
SnowflakeBatchServiceImpl();
+ private PCollection<Void> writeBatch(
+ PCollection<T> input, ValueProvider<String> stagingBucketDir) {
+ SnowflakeServices snowflakeServices =
+ getSnowflakeServices() != null ? getSnowflakeServices() : new
SnowflakeServicesImpl();
PCollection<String> files = writeBatchFiles(input, stagingBucketDir);
// Combining PCollection of files as a side input into one list of files
ListCoder<String> coder = ListCoder.of(StringUtf8Coder.of());
- files =
- (PCollection)
- files
- .getPipeline()
- .apply(
- Reify.viewInGlobalWindow(
- (PCollectionView) files.apply(View.asList()),
coder));
+ PCollection<List<String>> reified_files =
Review Comment:
I see. Thanks for paying attention. I will never find a set of all commands
that will allow me to check all stylistically changes to the code locally.
Issue Time Tracking
-------------------
Worklog Id: (was: 752533)
Time Spent: 67h 10m (was: 67h)
> Fix errorprone ignored warnings
> -------------------------------
>
> Key: BEAM-11936
> URL: https://issues.apache.org/jira/browse/BEAM-11936
> Project: Beam
> Issue Type: Task
> Components: build-system, runner-core, sdk-java-core,
> sdk-java-harness
> Reporter: Brian Hulette
> Priority: P3
> Time Spent: 67h 10m
> Remaining Estimate: 0h
>
> Upgrading to errorprone 2.3.4 (https://github.com/apache/beam/pull/14148)
> required ignoring a lot of new warnings. We should fix the offending code and
> re-enable these warnings.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)