Repository: incubator-beam Updated Branches: refs/heads/master 78abd964a -> c6aac3b26
Re-interrupt current thread when ignoring InterruptedException Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a8eeaf5e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a8eeaf5e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a8eeaf5e Branch: refs/heads/master Commit: a8eeaf5e1f0c74beb96ad78e73af4751cd650262 Parents: 78abd96 Author: Pei He <[email protected]> Authored: Mon Mar 14 17:34:57 2016 -0700 Committer: Luke Cwik <[email protected]> Committed: Wed Mar 30 16:27:23 2016 -0700 ---------------------------------------------------------------------- .../dataflow/examples/common/DataflowExampleUtils.java | 8 +++----- .../com/google/cloud/dataflow/sdk/io/BigQueryIO.java | 3 +++ .../dataflow/sdk/io/BoundedReadFromUnboundedSource.java | 6 +++--- .../sdk/runners/BlockingDataflowPipelineRunner.java | 3 +++ .../cloud/dataflow/sdk/runners/DataflowPipelineJob.java | 3 +++ .../sdk/transforms/IntraBundleParallelization.java | 1 + .../cloud/dataflow/sdk/util/BigQueryTableInserter.java | 2 ++ .../dataflow/sdk/util/BigQueryTableRowIterator.java | 12 +++++++----- .../com/google/cloud/dataflow/sdk/util/GcsUtil.java | 4 ++++ 9 files changed, 29 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java index 4dfdd85..23562d0 100644 --- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java +++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleUtils.java @@ -50,6 +50,7 @@ import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collection; @@ -108,6 +109,7 @@ public class DataflowExampleUtils { } } while (BackOffUtils.next(sleeper, backOff)); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Ignore InterruptedException } Throwables.propagate(lastException); @@ -442,11 +444,7 @@ public class DataflowExampleUtils { System.out.println( "The example pipeline is still running. Verifying the cancellation."); } - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - // Ignore - } + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); } if (!cancellationVerified) { System.out.println("Failed to verify the cancellation for job: " + job.getJobId()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index ab7df6f..8b08225 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -1438,6 +1438,9 @@ public class BigQueryIO { LOG.info("Number of records read from BigQuery: {}", elems.size()); context.setPCollection(context.getOutput(transform), elems); } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java index 52c730c..3fa9c69 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java @@ -29,6 +29,7 @@ import com.google.cloud.dataflow.sdk.util.IntervalBoundedExponentialBackOff; import com.google.cloud.dataflow.sdk.util.ValueWithRecordId; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.common.util.concurrent.Uninterruptibles; import org.joda.time.Duration; import org.joda.time.Instant; @@ -37,6 +38,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; /** @@ -234,9 +236,7 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PInput, PCollection<T if (reader.advance()) { return true; } - try { - Thread.sleep(nextSleep); - } catch (InterruptedException e) {} + Uninterruptibles.sleepUninterruptibly(nextSleep, TimeUnit.MILLISECONDS); nextSleep = backoff.nextBackOffMillis(); } finalizeCheckpoint(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java index 95e3dfe..7b45e5b 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/BlockingDataflowPipelineRunner.java @@ -118,6 +118,9 @@ public class BlockingDataflowPipelineRunner extends BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS, new MonitoringUtil.PrintHandler(options.getJobMessageOutput())); } catch (IOException | InterruptedException ex) { + if (ex instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } LOG.debug("Exception caught while retrieving status for job {}", job.getJobId(), ex); throw new DataflowServiceException( job, "Exception caught while retrieving status for job " + job.getJobId(), ex); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java index e9f134c..c5173a9 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineJob.java @@ -348,6 +348,9 @@ public class DataflowPipelineJob implements PipelineResult { try { return BackOffUtils.next(sleeper, backoff); } catch (InterruptedException | IOException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw Throwables.propagate(e); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java index b6497b7..39d2dc8 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java @@ -204,6 +204,7 @@ public class IntraBundleParallelization { try { workTickets.acquire(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException("Interrupted while scheduling work", e); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java index cd51062..4a9ea6b 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java @@ -248,6 +248,7 @@ public class BigQueryTableInserter { } } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException("Interrupted while inserting " + rowsToPublish); } catch (ExecutionException e) { Throwables.propagate(e.getCause()); @@ -257,6 +258,7 @@ public class BigQueryTableInserter { try { Thread.sleep(backoff.nextBackOffMillis()); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException("Interrupted while waiting before retrying insert of " + retryRows); } LOG.info("Retrying failed inserts to BigQuery"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java index c2c80f7..75b3bb9 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java @@ -45,6 +45,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Uninterruptibles; import org.joda.time.Duration; import org.slf4j.Logger; @@ -59,6 +60,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Random; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -412,11 +414,8 @@ public class BigQueryTableRowIterator implements AutoCloseable { throw new IOException("Executing query " + query + " failed: " + error.getMessage()); } } - try { - Thread.sleep(QUERY_COMPLETION_POLL_TIME.getMillis()); - } catch (InterruptedException e) { - e.printStackTrace(); - } + Uninterruptibles.sleepUninterruptibly( + QUERY_COMPLETION_POLL_TIME.getMillis(), TimeUnit.MILLISECONDS); } } @@ -462,6 +461,9 @@ public class BigQueryTableRowIterator implements AutoCloseable { deleteDataset(temporaryDatasetId); } } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eeaf5e/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GcsUtil.java index 8fd258f..47adb59 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GcsUtil.java @@ -159,6 +159,9 @@ public class GcsUtil { IOException.class); return ImmutableList.of(gcsPattern); } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) { // If the path was not found, return an empty list. return ImmutableList.of(); @@ -343,6 +346,7 @@ public class GcsUtil { } throw e; } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException( String.format("Error while attempting to verify existence of bucket gs://%s", path.getBucket()), e);
