Repository: crunch Updated Branches: refs/heads/master e520d9f6e -> e176b6166
CRUNCH-636: Make replication factor for temporary files configurable Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/e176b616 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/e176b616 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/e176b616 Branch: refs/heads/master Commit: e176b6166218fabc247eef25cbbc549271f8bd2d Parents: e520d9f Author: Attila Sasvari <[email protected]> Authored: Mon Mar 20 11:17:55 2017 +0100 Committer: Josh Wills <[email protected]> Committed: Mon Mar 20 11:53:55 2017 -0700 ---------------------------------------------------------------------- .../crunch/impl/dist/DistributedPipeline.java | 40 ++++++- .../crunch/impl/mr/plan/JobPrototype.java | 94 ++++++++++++++- .../impl/dist/DistributedPipelineTest.java | 96 +++++++++++++++ .../crunch/impl/mr/plan/JobPrototypeTest.java | 117 +++++++++++++++++++ 4 files changed, 344 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/e176b616/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java index d3fb0d0..1deafd5 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.impl.dist; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -51,6 +52,7 @@ import org.apache.crunch.io.From; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.ReadableSourceTarget; import org.apache.crunch.io.To; +import org.apache.crunch.io.impl.FileTargetImpl; import org.apache.crunch.materialize.MaterializableIterable; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; @@ -58,6 +60,7 @@ import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +74,7 @@ public abstract class DistributedPipeline implements Pipeline { private static final Logger LOG = LoggerFactory.getLogger(DistributedPipeline.class); private static final Random RANDOM = new Random(); + private static final String CRUNCH_TMP_DIRS = "crunch.tmp.dirs"; private final String name; protected final PCollectionFactory factory; @@ -103,6 +107,22 @@ public abstract class DistributedPipeline implements Pipeline { this.nextAnonymousStageId = 0; } + public static boolean isTempDir(Job job, String outputPath) { + String tmpDirs = job.getConfiguration().get(CRUNCH_TMP_DIRS); + + if (tmpDirs == null ) { + return false; + } + + for (String p : tmpDirs.split(":")) { + if (outputPath.contains(p)) { + LOG.debug(String.format("Matched temporary directory : %s in %s", p, outputPath)); + return true; + } + } + return false; + } + public PCollectionFactory getFactory() { return factory; } @@ -390,7 +410,25 @@ public abstract class DistributedPipeline implements Pipeline { public Path createTempPath() { tempFileIndex++; - return new Path(getTempDirectory(), "p" + tempFileIndex); + Path path = new Path(getTempDirectory(), "p" + tempFileIndex); + storeTempDirLocation(path); + return path; + } + + @VisibleForTesting + protected void storeTempDirLocation(Path t) { + String tmpCfg = conf.get(CRUNCH_TMP_DIRS); + String tmpDir = t.toString(); + + LOG.debug(String.format("Temporary directory created: %s", tmpDir)); + + if (tmpCfg != null && !tmpCfg.contains(tmpDir)) { + conf.set(CRUNCH_TMP_DIRS, String.format("%s:%s", tmpCfg, tmpDir)); + } + else if (tmpCfg == null) { + conf.set(CRUNCH_TMP_DIRS, tmpDir); + } + } private synchronized Path getTempDirectory() { http://git-wip-us.apache.org/repos/asf/crunch/blob/e176b616/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java index d23de3b..d31bfad 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java @@ -23,9 +23,10 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.crunch.Pipeline; +import com.google.common.annotations.VisibleForTesting; import org.apache.crunch.Target; import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; +import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.impl.mr.collect.DoTable; @@ -39,9 +40,11 @@ import org.apache.crunch.impl.mr.run.CrunchOutputFormat; import org.apache.crunch.impl.mr.run.CrunchReducer; import org.apache.crunch.impl.mr.run.NodeContext; import org.apache.crunch.impl.mr.run.RTNode; +import org.apache.crunch.io.impl.FileTargetImpl; import org.apache.crunch.types.PType; import org.apache.crunch.util.DistCache; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; @@ -51,11 +54,18 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + class JobPrototype { + private static final String DFS_REPLICATION = "dfs.replication"; + private static final String DFS_REPLICATION_INITIAL = "dfs.replication.initial"; + private static final String CRUNCH_TMP_DIR_REPLICATION = "crunch.tmp.dir.replication"; + public static JobPrototype createMapReduceJob(int jobID, PGroupedTableImpl<?, ?> group, - Set<NodePath> inputs, Path workingPath) { + Set<NodePath> inputs, Path workingPath) { return new JobPrototype(jobID, inputs, group, workingPath); } @@ -84,6 +94,7 @@ class JobPrototype { this.targetsToNodePaths = null; } + @VisibleForTesting private JobPrototype(int jobID, HashMultimap<Target, NodePath> outputPaths, Path workingPath) { this.jobID = jobID; this.group = null; @@ -141,9 +152,13 @@ class JobPrototype { return job; } + private static final Logger LOG = LoggerFactory.getLogger(JobPrototype.class); + private CrunchControlledJob build( Class<?> jarClass, Configuration conf, MRPipeline pipeline, int numOfJobs) throws IOException { + Job job = new Job(conf); + LOG.debug(String.format("Replication factor: %s", job.getConfiguration().get(DFS_REPLICATION))); conf = job.getConfiguration(); conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString()); job.setJarByClass(jarClass); @@ -152,19 +167,28 @@ class JobPrototype { Set<Target> allTargets = Sets.newHashSet(); Path outputPath = new Path(workingPath, "output"); MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath, group == null); + + boolean onlyHasTemporaryOutput =true; + for (Target target : targetsToNodePaths.keySet()) { DoNode node = null; + LOG.debug("Target path: " + target); for (NodePath nodePath : targetsToNodePaths.get(target)) { if (node == null) { PType<?> ptype = nodePath.tail().getPType(); node = DoNode.createOutputNode(target.toString(), target.getConverter(ptype), ptype); outputHandler.configureNode(node, target); + + onlyHasTemporaryOutput &= DistributedPipeline.isTempDir(job, target.toString()); } outputNodes.add(walkPath(nodePath.descendingIterator(), node)); } allTargets.add(target); } + setJobReplication(job.getConfiguration(), onlyHasTemporaryOutput); + + Set<DoNode> mapSideNodes = Sets.newHashSet(); if (mapSideNodePaths != null) { for (Target target : mapSideNodePaths.keySet()) { @@ -243,6 +267,72 @@ class JobPrototype { completionHook); } + @VisibleForTesting + protected void setJobReplication(Configuration jobConfiguration, boolean onlyHasTemporaryOutput) { + String userSuppliedTmpDirReplication = jobConfiguration.get(CRUNCH_TMP_DIR_REPLICATION); + if (userSuppliedTmpDirReplication == null) { + return; + } + + handleInitialReplication(jobConfiguration); + + if (onlyHasTemporaryOutput) { + LOG.debug(String.format("Setting replication factor to: %s ", userSuppliedTmpDirReplication)); + jobConfiguration.set(DFS_REPLICATION, userSuppliedTmpDirReplication); + } + else { + String originalReplication = jobConfiguration.get(DFS_REPLICATION_INITIAL); + LOG.debug(String.format("Using initial replication factor (%s)", originalReplication)); + jobConfiguration.set(DFS_REPLICATION, originalReplication); + } + } + + @VisibleForTesting + protected void handleInitialReplication(Configuration jobConfiguration) { + + String origReplication = jobConfiguration.get(DFS_REPLICATION_INITIAL); + if (origReplication != null) { + LOG.debug(String.format("Initial replication has been already set (%s); nothing to do.", origReplication)); + return; + } + + String defaultReplication = jobConfiguration.get(DFS_REPLICATION); + + if (defaultReplication != null) { + LOG.debug(String.format("Using dfs.replication (%s) set by user as initial replication.", + defaultReplication)); + setInitialJobReplicationConfig(jobConfiguration, defaultReplication); + return; + } + + Set<Target> targets = targetsToNodePaths.keySet(); + Target t = targets.iterator().next(); + if (t instanceof FileTargetImpl) { + Path path = ((FileTargetImpl) t).getPath(); + defaultReplication = tryGetDefaultReplicationFromFileSystem(jobConfiguration, path, "3"); + } + + setInitialJobReplicationConfig(jobConfiguration, defaultReplication); + } + + private String tryGetDefaultReplicationFromFileSystem(Configuration jobConf, Path path, String defaultReplication) { + String d; + try { + FileSystem fs = path.getFileSystem(jobConf); + d = fs.getConf().get(DFS_REPLICATION); + LOG.debug( + String.format("Using dfs.replication (%s) retrieved from remote filesystem as initial replication.", d)); + } catch (IOException e) { + d = defaultReplication; + LOG.warn(String.format("Cannot read job's config. Setting initial replication to %s.", d)); + } + return d; + } + + private void setInitialJobReplicationConfig(Configuration job, String defaultReplication) { + job.set(DFS_REPLICATION_INITIAL, defaultReplication); + } + private static CrunchControlledJob.Hook getHook( CrunchControlledJob.Hook base, List<CrunchControlledJob.Hook> optional) { http://git-wip-us.apache.org/repos/asf/crunch/blob/e176b616/crunch-core/src/test/java/org/apache/crunch/impl/dist/DistributedPipelineTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/dist/DistributedPipelineTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/dist/DistributedPipelineTest.java new file mode 100644 index 0000000..40e4bf0 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/impl/dist/DistributedPipelineTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.dist; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class DistributedPipelineTest { + private final String testTempDirPath1 = "/tmp/crunch-1345424622/p1"; + private final String testTempDirPath2 = "/tmp/crunch-1345424622/p2"; + + @Mock private Job mockJob; + @Mock private Path mockPath; + private Configuration testConfiguration = new Configuration(); + + @Before + public void setUp() { + when(mockJob.getConfiguration()).thenReturn(testConfiguration); + } + + @Test + public void isTempDirFalseWhenCrunchCreatesNoDirs() { + boolean isTmp = DistributedPipeline.isTempDir(mockJob, testTempDirPath1); + Assert.assertFalse(isTmp); + } + + @Test + public void isTempDirTrueWhenFileIsInTempDir() { + testConfiguration.set("crunch.tmp.dirs", "/tmp/crunch-1345424622/p1"); + boolean isTmp = DistributedPipeline.isTempDir(mockJob, testTempDirPath1); + Assert.assertTrue(isTmp); + } + + @Test + public void isTempDirFalseWhenFileIsNotInTempDir() { + testConfiguration.set("crunch.tmp.dirs", testTempDirPath1.toString()); + boolean isTmp = DistributedPipeline.isTempDir(mockJob, "/user/crunch/iwTV2/"); + Assert.assertFalse(isTmp); + } + + @Test + public void tempDirsAreStoredInPipelineConf() { + DistributedPipeline distributedPipeline = Mockito.mock(DistributedPipeline.class, Mockito.CALLS_REAL_METHODS); + Configuration testConfiguration = new Configuration(); + distributedPipeline.setConfiguration(testConfiguration); + + // no temp directory is present at startup + Assert.assertEquals( + null, + distributedPipeline.getConfiguration().get("crunch.tmp.dirs")); + + // store a temp directory + distributedPipeline.storeTempDirLocation(new Path(testTempDirPath1)); + Assert.assertEquals( + testTempDirPath1.toString(), + distributedPipeline.getConfiguration().get("crunch.tmp.dirs")); + + // store one more temp directory + distributedPipeline.storeTempDirLocation(new Path(testTempDirPath2)); + Assert.assertEquals( + String.format("%s:%s", testTempDirPath1.toString(), testTempDirPath2.toString()), + distributedPipeline.getConfiguration().get("crunch.tmp.dirs")); + + // try to store the first temp directory again, not added again + distributedPipeline.storeTempDirLocation(new Path(testTempDirPath1)); + Assert.assertEquals( + String.format("%s:%s", testTempDirPath1.toString(), testTempDirPath2.toString()), + distributedPipeline.getConfiguration().get("crunch.tmp.dirs")); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/e176b616/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobPrototypeTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobPrototypeTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobPrototypeTest.java new file mode 100644 index 0000000..44da01a --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobPrototypeTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import com.google.common.collect.HashMultimap; +import org.apache.crunch.Target; +import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; +import org.apache.crunch.io.impl.FileTargetImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.IOException; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; + +@RunWith(MockitoJUnitRunner.class) +public class JobPrototypeTest { + + public static final String DFS_REPLICATION = "dfs.replication"; + public static final String TEST_INITIAL_DFS_REPLICATION = "42"; + public static final String TEST_TMP_DIR_REPLICATION = "1"; + + @Mock private Path mockPath; + @Mock private FileTargetImpl mockTarget; + @Mock private FileSystem mockFs; + @Mock private DoNode mockNode; + @Mock private PGroupedTableImpl<String, String> mockPgroup; + @Mock private Set<NodePath> mockInputs; + private JobPrototype jobPrototypeUnderTest; + private Configuration testConfiguration = new Configuration(); + + @Before + public void setUp() { + testConfiguration.set("dfs.replication.initial", TEST_INITIAL_DFS_REPLICATION); + testConfiguration.set("crunch.tmp.dir.replication", TEST_TMP_DIR_REPLICATION); + doReturn(new Object[]{}).when(mockInputs).toArray(); + jobPrototypeUnderTest= JobPrototype.createMapReduceJob(42, + mockPgroup, mockInputs, mockPath); + } + + @Test + public void initialReplicationFactorSetForLeafOutputTargets() { + jobPrototypeUnderTest.setJobReplication(testConfiguration, true); + + assertEquals(TEST_TMP_DIR_REPLICATION, testConfiguration.get(DFS_REPLICATION)); + } + + @Test + public void userDefinedTmpDirReplicationFactorSetForIntermediateTargets() { + jobPrototypeUnderTest.setJobReplication(testConfiguration, false); + + assertEquals(TEST_INITIAL_DFS_REPLICATION, testConfiguration.get(DFS_REPLICATION)); + } + + @Test + public void initialReplicationFactorSetIfUserSpecified() { + jobPrototypeUnderTest.handleInitialReplication(testConfiguration); + + assertEquals(TEST_INITIAL_DFS_REPLICATION, testConfiguration.get("dfs.replication.initial")); + } + + @Test + public void initialReplicationFactorUsedFromFileSystem() throws IOException { + testConfiguration = new Configuration(); + HashMultimap<Target, NodePath> targetNodePaths = HashMultimap.create(); + targetNodePaths.put(mockTarget, new NodePath()); + doReturn(mockPath).when(mockTarget).getPath(); + doReturn(mockFs).when(mockPath).getFileSystem(any(Configuration.class)); + Configuration c = new Configuration(); + c.set("dfs.replication", TEST_INITIAL_DFS_REPLICATION); + doReturn(c).when(mockFs).getConf(); + jobPrototypeUnderTest.addReducePaths(targetNodePaths); + + jobPrototypeUnderTest.handleInitialReplication(testConfiguration); + assertEquals(TEST_INITIAL_DFS_REPLICATION, testConfiguration.get("dfs.replication.initial")); + } + + @Test + public void initialReplicationFactorUsedWhenItCannotBeRetrievedFromFileSystem() throws IOException { + testConfiguration = new Configuration(); + HashMultimap<Target, NodePath> targetNodePaths = HashMultimap.create(); + targetNodePaths.put(mockTarget, new NodePath()); + doReturn(mockPath).when(mockTarget).getPath(); + doThrow(new IOException()).when(mockPath).getFileSystem(any(Configuration.class)); + Configuration c = new Configuration(); + c.set("dfs.replication", TEST_INITIAL_DFS_REPLICATION); + jobPrototypeUnderTest.addReducePaths(targetNodePaths); + + jobPrototypeUnderTest.handleInitialReplication(testConfiguration); + assertEquals("3", testConfiguration.get("dfs.replication.initial")); //default + } +}
