[
https://issues.apache.org/jira/browse/BEAM-9891?focusedWorklogId=471662&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-471662
]
ASF GitHub Bot logged work on BEAM-9891:
----------------------------------------
Author: ASF GitHub Bot
Created on: 17/Aug/20 20:33
Start Date: 17/Aug/20 20:33
Worklog Time Spent: 10m
Work Description: amaliujia commented on a change in pull request #12601:
URL: https://github.com/apache/beam/pull/12601#discussion_r471758078
##########
File path:
sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
##########
@@ -121,19 +152,26 @@ public static void runUsingSqlTransform(String[] args)
throws Exception {
String queryString = QueryReader.readQuery(queryNameArr[i]);
PCollectionTuple tables = getTables(pipelines[i], csvFormat,
queryNameArr[i]);
- tables
- .apply(
- SqlTransform.query(queryString))
- .apply(
-
MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
- .apply(TextIO.write()
- .to(resultDirectory + "/" + dataSize + "/" +
pipelines[i].getOptions().getJobName())
- .withSuffix(".txt")
- .withNumShards(1));
+ try {
+ tables
+ .apply(
+ SqlTransform.query(queryString))
+ .apply(
+
MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.toString()))
+ .apply(TextIO.write()
+ .to(RESULT_DIRECTORY + "/" + dataSize + "/" +
pipelines[i].getOptions().getJobName())
+ .withSuffix(".txt")
+ .withNumShards(1));
+ } catch (Exception e) {
+ System.out.println(queryNameArr[i] + " failed to execute");
Review comment:
Can you replace this with LOG? Here is an example of LOG:
https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java#L75
##########
File path:
sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java
##########
@@ -97,7 +128,7 @@ public static void runUsingSqlTransform(String[] args)
throws Exception {
// Using ExecutorService and CompletionService to fulfill
multi-threading functionality
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
- CompletionService<PipelineResult> completion = new
ExecutorCompletionService<>(executor);
+ CompletionService<TpcdsRunResult> completion = new
ExecutorCompletionService<>(executor);
Review comment:
Have also check the state of the PipelineResult:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L38?
Make sure only print successful when job state is successful.
##########
File path:
sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java
##########
@@ -24,17 +24,30 @@
/**
* To fulfill multi-threaded execution
*/
-public class TpcdsRun implements Callable<PipelineResult> {
+public class TpcdsRun implements Callable<TpcdsRunResult> {
private final Pipeline pipeline;
public TpcdsRun (Pipeline pipeline) {
this.pipeline = pipeline;
}
@Override
- public PipelineResult call() {
- PipelineResult pipelineResult = pipeline.run();
- pipelineResult.waitUntilFinish();
- return pipelineResult;
+ public TpcdsRunResult call() {
+ TpcdsRunResult tpcdsRunResult;
+
+ try {
+ PipelineResult pipelineResult = pipeline.run();
+ long startTimeStamp = System.currentTimeMillis();
+ pipelineResult.waitUntilFinish();
+ long endTimeStamp = System.currentTimeMillis();
Review comment:
check pipeline result state.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 471662)
Time Spent: 4h 50m (was: 4h 40m)
> TPC benchmarks for BeamSQL
> --------------------------
>
> Key: BEAM-9891
> URL: https://issues.apache.org/jira/browse/BEAM-9891
> Project: Beam
> Issue Type: Task
> Components: dsl-sql
> Reporter: Rui Wang
> Assignee: Yuwei Fu
> Priority: P2
> Time Spent: 4h 50m
> Remaining Estimate: 0h
>
> TPC benchmarks [1] are industrial standard that we can adopt for BeamSQL.
> There are references from Spark[2] and Flink[3] that are using it. This Jira
> tracks the effort to integrate TPC benchmarks with BeamSQL.
> [1]: http://www.tpc.org/information/benchmarks.asp
> [2]: https://github.com/databricks/spark-sql-perf
> [3]:
> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-tpcds-test
--
This message was sent by Atlassian Jira
(v8.3.4#803005)