This is an automated email from the ASF dual-hosted git repository.
jwills pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/crunch.git
The following commit(s) were added to refs/heads/master by this push:
new 571b90c CRUNCH-678: Avoid unnecessary last modified time retrieval
571b90c is described below
commit 571b90c03e3010e7bb9badf4e6e441ab2164be56
Author: Andrew Olson <[email protected]>
AuthorDate: Tue Feb 19 16:46:20 2019 -0600
CRUNCH-678: Avoid unnecessary last modified time retrieval
Signed-off-by: Josh Wills <[email protected]>
---
crunch-core/src/main/java/org/apache/crunch/Target.java | 2 +-
.../java/org/apache/crunch/impl/dist/DistributedPipeline.java | 6 ++++--
.../apache/crunch/impl/dist/collect/BaseUnionCollection.java | 11 ++++++++---
.../org/apache/crunch/impl/dist/collect/BaseUnionTable.java | 11 ++++++++---
.../src/main/java/org/apache/crunch/impl/mem/MemPipeline.java | 2 ++
.../main/java/org/apache/crunch/io/impl/FileTargetImpl.java | 6 +++++-
6 files changed, 28 insertions(+), 10 deletions(-)
diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java
b/crunch-core/src/main/java/org/apache/crunch/Target.java
index 03b6eef..4dec831 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Target.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Target.java
@@ -72,7 +72,7 @@ public interface Target {
*
* @param writeMode The strategy for handling existing outputs
* @param lastModifiedAt the time of the most recent modification to one of
the source inputs for handling based
- * on the provided {@code writeMode}.
+ * on the provided {@code writeMode}, or -1 if not
relevant for the provided {@code writeMode}
* @param conf The ever-useful {@code Configuration} instance
* @return true if the target did exist
*/
diff --git
a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
index 0afa766..abd318c 100644
---
a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
+++
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
@@ -238,8 +238,10 @@ public abstract class DistributedPipeline implements
Pipeline {
pcollection = pcollection.parallelDo("UnionCollectionWrapper",
(MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
}
- boolean exists = target.handleExisting(writeMode, ((PCollectionImpl)
pcollection).getLastModifiedAt(),
- getConfiguration());
+ // Last modified time is only relevant when write mode is checkpoint
+ long lastModifiedAt = (writeMode == Target.WriteMode.CHECKPOINT)
+ ? ((PCollectionImpl) pcollection).getLastModifiedAt() : -1;
+ boolean exists = target.handleExisting(writeMode, lastModifiedAt,
getConfiguration());
if (exists && writeMode == Target.WriteMode.CHECKPOINT) {
SourceTarget<?> st = target.asSourceTarget(pcollection.getPType());
if (st == null) {
diff --git
a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionCollection.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionCollection.java
index ef10ee7..855bd18 100644
---
a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionCollection.java
+++
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionCollection.java
@@ -50,9 +50,6 @@ public class BaseUnionCollection<S> extends
PCollectionImpl<S> {
throw new IllegalStateException("Cannot union PCollections from
different Pipeline instances");
}
size += parent.getSize();
- if (parent.getLastModifiedAt() > lastModifiedAt) {
- this.lastModifiedAt = parent.getLastModifiedAt();
- }
}
}
@@ -72,6 +69,14 @@ public class BaseUnionCollection<S> extends
PCollectionImpl<S> {
@Override
public long getLastModifiedAt() {
+ if (lastModifiedAt == -1) {
+ for (PCollectionImpl<S> parent : parents) {
+ long parentLastModifiedAt = parent.getLastModifiedAt();
+ if (parentLastModifiedAt > lastModifiedAt) {
+ lastModifiedAt = parentLastModifiedAt;
+ }
+ }
+ }
return lastModifiedAt;
}
diff --git
a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionTable.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionTable.java
index 4d688c3..58617fa 100644
---
a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionTable.java
+++
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionTable.java
@@ -56,9 +56,6 @@ public class BaseUnionTable<K, V> extends PTableBase<K, V> {
}
this.parents.add(parent);
size += parent.getSize();
- if (parent.getLastModifiedAt() > lastModifiedAt) {
- this.lastModifiedAt = parent.getLastModifiedAt();
- }
}
}
@@ -77,6 +74,14 @@ public class BaseUnionTable<K, V> extends PTableBase<K, V> {
@Override
public long getLastModifiedAt() {
+ if (lastModifiedAt == -1) {
+ for (PCollectionImpl<Pair<K, V>> parent : parents) {
+ long parentLastModifiedAt = parent.getLastModifiedAt();
+ if (parentLastModifiedAt > lastModifiedAt) {
+ lastModifiedAt = parentLastModifiedAt;
+ }
+ }
+ }
return lastModifiedAt;
}
diff --git
a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 4d4d5dd..9b1345c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -200,6 +200,8 @@ public class MemPipeline implements Pipeline {
@Override
public void write(PCollection<?> collection, Target target, Target.WriteMode
writeMode) {
+ // Last modified time does not need to be retrieved for this
+ // pipeline implementation
target.handleExisting(writeMode, -1, getConfiguration());
if (writeMode != Target.WriteMode.APPEND &&
activeTargets.contains(target)) {
throw new CrunchRuntimeException("Target " + target
diff --git
a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index e8b1dfe..17efabb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -373,7 +373,11 @@ public class FileTargetImpl implements PathTarget {
exists = fs.exists(path);
if (exists) {
successful = fs.exists(getSuccessIndicator());
- lastModForTarget = SourceTargetHelper.getLastModifiedAt(fs, path);
+ // Last modified time is only relevant when the path exists and the
+ // write mode is checkpoint
+ if (successful && strategy == WriteMode.CHECKPOINT) {
+ lastModForTarget = SourceTargetHelper.getLastModifiedAt(fs, path);
+ }
}
} catch (IOException e) {
LOG.error("Exception checking existence of path: {}", path, e);