Repository: crunch Updated Branches: refs/heads/master 58ab2fa67 -> 2c9e8306c
CRUNCH-384: Upgrade Spark to 0.9.1 and Scala to 2.10; fix a bunch of things, so that counters and standalone distributed Spark jobs work. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2c9e8306 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2c9e8306 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2c9e8306 Branch: refs/heads/master Commit: 2c9e8306caddf7ed687c62802dc63fbd46110a90 Parents: 58ab2fa Author: Josh Wills <[email protected]> Authored: Thu Apr 24 17:56:05 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Wed Apr 30 16:31:12 2014 -0700 ---------------------------------------------------------------------- .../TaskInputOutputContextFactory.java | 76 ++++++++ .../org/apache/crunch/SparkUnionResultsIT.java | 4 +- .../impl/spark/CounterAccumulatorParam.java | 27 ++- .../apache/crunch/impl/spark/SparkPipeline.java | 9 + .../apache/crunch/impl/spark/SparkRuntime.java | 48 +++-- .../crunch/impl/spark/SparkRuntimeContext.java | 175 ++++++------------- .../org/apache/hadoop/mapred/SparkCounter.java | 76 ++++++++ pom.xml | 16 +- 8 files changed, 277 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/2c9e8306/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/TaskInputOutputContextFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/TaskInputOutputContextFactory.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/TaskInputOutputContextFactory.java new file mode 100644 index 0000000..1aa65b3 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/TaskInputOutputContextFactory.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.StatusReporter; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +import java.lang.reflect.Constructor; + +public class TaskInputOutputContextFactory { + private static final Log LOG = LogFactory.getLog(TaskInputOutputContextFactory.class); + + private static final TaskInputOutputContextFactory INSTANCE = new TaskInputOutputContextFactory(); + + public static TaskInputOutputContext create( + Configuration conf, + TaskAttemptID taskAttemptId, + StatusReporter reporter) { + return INSTANCE.createInternal(conf, taskAttemptId, reporter); + } + + private Constructor<? extends TaskInputOutputContext> taskIOConstructor; + private int arity; + + private TaskInputOutputContextFactory() { + String ic = TaskInputOutputContext.class.isInterface() ? + "org.apache.hadoop.mapreduce.task.MapContextImpl" : + "org.apache.hadoop.mapreduce.MapContext"; + try { + Class<? extends TaskInputOutputContext> implClass = (Class<? extends TaskInputOutputContext>) Class.forName(ic); + this.taskIOConstructor = (Constructor<? extends TaskInputOutputContext>) implClass.getConstructor( + Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class, + OutputCommitter.class, StatusReporter.class, InputSplit.class); + this.arity = 7; + } catch (Exception e) { + LOG.fatal("Could not access TaskInputOutputContext constructor, exiting", e); + } + } + + private TaskInputOutputContext createInternal(Configuration conf, TaskAttemptID taskAttemptId, + StatusReporter reporter) { + Object[] args = new Object[arity]; + args[0] = conf; + args[1] = taskAttemptId; + args[5] = reporter; + try { + return taskIOConstructor.newInstance(args); + } catch (Exception e) { + LOG.error("Could not construct a TaskInputOutputContext instance", e); + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/2c9e8306/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java index 785f45a..4858d6c 100644 --- a/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java +++ b/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java @@ -39,6 +39,7 @@ public class SparkUnionResultsIT extends CrunchTestSupport implements Serializab static class StringLengthMapFn extends MapFn<String, Pair<String, Long>> { @Override public Pair<String, Long> map(String input) { + increment("my", "counter"); return new Pair<String, Long>(input, 10L); } } @@ -93,7 +94,8 @@ public class SparkUnionResultsIT extends CrunchTestSupport implements Serializab PTable<String, Long> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count(); PTables.asPTable(set2Counts.union(set1Lengths)).groupByKey().ungroup() .write(At.sequenceFile(output, Writables.strings(), Writables.longs())); - pipeline.done(); + PipelineResult res = pipeline.done(); + assertEquals(4, res.getStageResults().get(0).getCounterValue("my", "counter")); } @Test http://git-wip-us.apache.org/repos/asf/crunch/blob/2c9e8306/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java index e1cb5c7..cd2692c 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java @@ -22,24 +22,35 @@ import org.apache.spark.AccumulatorParam; import java.util.Map; -public class CounterAccumulatorParam implements AccumulatorParam<Map<String, Long>> { +public class CounterAccumulatorParam implements AccumulatorParam<Map<String, Map<String, Long>>> { @Override - public Map<String, Long> addAccumulator(Map<String, Long> current, Map<String, Long> added) { - for (Map.Entry<String, Long> e : added.entrySet()) { - Long cnt = current.get(e.getKey()); - cnt = (cnt == null) ? e.getValue() : cnt + e.getValue(); - current.put(e.getKey(), cnt); + public Map<String, Map<String, Long>> addAccumulator( + Map<String, Map<String, Long>> current, + Map<String, Map<String, Long>> added) { + for (Map.Entry<String, Map<String, Long>> e : added.entrySet()) { + Map<String, Long> grp = current.get(e.getKey()); + if (grp == null) { + grp = Maps.newTreeMap(); + current.put(e.getKey(), grp); + } + for (Map.Entry<String, Long> f : e.getValue().entrySet()) { + Long cnt = grp.get(f.getKey()); + cnt = (cnt == null) ? f.getValue() : cnt + f.getValue(); + grp.put(f.getKey(), cnt); + } } return current; } @Override - public Map<String, Long> addInPlace(Map<String, Long> first, Map<String, Long> second) { + public Map<String, Map<String, Long>> addInPlace( + Map<String, Map<String, Long>> first, + Map<String, Map<String, Long>> second) { return addAccumulator(first, second); } @Override - public Map<String, Long> zero(Map<String, Long> counts) { + public Map<String, Map<String, Long>> zero(Map<String, Map<String, Long>> counts) { return Maps.newHashMap(); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/2c9e8306/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java index 49e1d35..05e6e0c 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java @@ -43,11 +43,17 @@ public class SparkPipeline extends DistributedPipeline { private final String sparkConnect; private JavaSparkContext sparkContext; + private Class<?> jarClass; private final Map<PCollection<?>, StorageLevel> cachedCollections = Maps.newHashMap(); public SparkPipeline(String sparkConnect, String appName) { + this(sparkConnect, appName, null); + } + + public SparkPipeline(String sparkConnect, String appName, Class<?> jarClass) { super(appName, new Configuration(), new SparkCollectFactory()); this.sparkConnect = Preconditions.checkNotNull(sparkConnect); + this.jarClass = jarClass; } public SparkPipeline(JavaSparkContext sparkContext, String appName) { @@ -113,6 +119,9 @@ public class SparkPipeline extends DistributedPipeline { } if (sparkContext == null) { this.sparkContext = new JavaSparkContext(sparkConnect, getName()); + if (jarClass != null) { + sparkContext.addJar(JavaSparkContext.jarOfClass(jarClass)[0]); + } } SparkRuntime runtime = new SparkRuntime(this, sparkContext, getConfiguration(), outputTargets, toMaterialize, cachedCollections); http://git-wip-us.apache.org/repos/asf/crunch/blob/2c9e8306/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java index ecc7023..2016c50 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java @@ -18,10 +18,11 @@ package org.apache.crunch.impl.spark; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractFuture; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.crunch.CombineFn; import org.apache.crunch.PCollection; import org.apache.crunch.PipelineExecution; @@ -39,12 +40,15 @@ import org.apache.crunch.types.Converter; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; +import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDDLike; @@ -54,7 +58,6 @@ import org.apache.spark.storage.StorageLevel; import java.io.IOException; import java.net.URI; import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -65,17 +68,19 @@ import java.util.concurrent.atomic.AtomicReference; public class SparkRuntime extends AbstractFuture<PipelineResult> implements PipelineExecution { + private static final Log LOG = LogFactory.getLog(SparkRuntime.class); + private SparkPipeline pipeline; private JavaSparkContext sparkContext; private Configuration conf; private CombineFn combineFn; private SparkRuntimeContext ctxt; + private Accumulator<Map<String, Map<String, Long>>> counters; private Map<PCollectionImpl<?>, Set<Target>> outputTargets; private Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize; private Map<PCollection<?>, StorageLevel> toCache; private final CountDownLatch doneSignal = new CountDownLatch(1); private AtomicReference<Status> status = new AtomicReference<Status>(Status.READY); - private PipelineResult result; private boolean started; private Thread monitorThread; @@ -103,9 +108,9 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe this.pipeline = pipeline; this.sparkContext = sparkContext; this.conf = conf; - this.ctxt = new SparkRuntimeContext( - sparkContext.broadcast(conf), - sparkContext.accumulator(Maps.<String, Long>newHashMap(), new CounterAccumulatorParam())); + this.counters = sparkContext.accumulator(Maps.<String, Map<String, Long>>newHashMap(), + new CounterAccumulatorParam()); + this.ctxt = new SparkRuntimeContext(counters); this.outputTargets = Maps.newTreeMap(DEPTH_COMPARATOR); this.outputTargets.putAll(outputTargets); this.toMaterialize = toMaterialize; @@ -203,13 +208,11 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe for (PCollectionImpl<?> pcollect : outputTargets.keySet()) { targetDeps.put(pcollect, pcollect.getTargetDependencies()); } - while (!targetDeps.isEmpty() && doneSignal.getCount() > 0) { Set<Target> allTargets = Sets.newHashSet(); for (PCollectionImpl<?> pcollect : targetDeps.keySet()) { allTargets.addAll(outputTargets.get(pcollect)); } - Map<PCollectionImpl<?>, JavaRDDLike<?, ?>> pcolToRdd = Maps.newTreeMap(DEPTH_COMPARATOR); for (PCollectionImpl<?> pcollect : targetDeps.keySet()) { if (Sets.intersection(allTargets, targetDeps.get(pcollect)).isEmpty()) { @@ -227,6 +230,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe } for (Target t : targets) { Configuration conf = new Configuration(getConfiguration()); + getRuntimeContext().setConf(sparkContext.broadcast(WritableUtils.toByteArray(conf))); if (t instanceof MapReduceTarget) { //TODO: check this earlier Converter c = t.getConverter(ptype); JavaPairRDD<?, ?> outRDD; @@ -239,7 +243,6 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe .map(new PairMapFunction(ptype.getOutputMapFn(), ctxt)) .map(new OutputConverterFunction(c)); } - try { Job job = new Job(conf); if (t instanceof PathTarget) { @@ -281,16 +284,26 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe } if (status.get() != Status.FAILED || status.get() != Status.KILLED) { status.set(Status.SUCCEEDED); - result = new PipelineResult( - ImmutableList.of(new PipelineResult.StageResult("Spark", null, start, System.currentTimeMillis())), - Status.SUCCEEDED); - set(result); + set(new PipelineResult( + ImmutableList.of(new PipelineResult.StageResult("Spark", getCounters(), start, System.currentTimeMillis())), + Status.SUCCEEDED)); } else { set(PipelineResult.EMPTY); } doneSignal.countDown(); } + private Counters getCounters() { + Counters c = new Counters(); + for (Map.Entry<String, Map<String, Long>> e : counters.value().entrySet()) { + CounterGroup cg = c.getGroup(e.getKey()); + for (Map.Entry<String, Long> f : e.getValue().entrySet()) { + cg.findCounter(f.getKey()).setValue(f.getValue()); + } + } + return c; + } + @Override public PipelineResult get() throws InterruptedException, ExecutionException { if (getStatus() == Status.READY) { @@ -315,7 +328,12 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe @Override public PipelineResult getResult() { - return result; + try { + return get(); + } catch (Exception e) { + LOG.error("Exception retrieving PipelineResult, returning EMPTY", e); + return PipelineResult.EMPTY; + } } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/2c9e8306/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java index 78436c2..102ad4a 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java @@ -18,19 +18,15 @@ package org.apache.crunch.impl.spark; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; -import javassist.util.proxy.MethodFilter; -import javassist.util.proxy.MethodHandler; -import javassist.util.proxy.ProxyFactory; +import com.google.common.collect.Maps; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.DoFn; +import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.TaskInputOutputContextFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.mapred.SparkCounter; import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.StatusReporter; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskInputOutputContext; @@ -38,32 +34,35 @@ import org.apache.spark.Accumulator; import org.apache.spark.SparkFiles; import org.apache.spark.broadcast.Broadcast; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.lang.reflect.Method; import java.net.URI; import java.util.List; import java.util.Map; -import java.util.Set; public class SparkRuntimeContext implements Serializable { - private Broadcast<Configuration> broadConf; - private Accumulator<Map<String, Long>> counters; + private Broadcast<byte[]> broadConf; + private final Accumulator<Map<String, Map<String, Long>>> counters; + private transient Configuration conf; private transient TaskInputOutputContext context; - public SparkRuntimeContext( - Broadcast<Configuration> broadConf, - Accumulator<Map<String, Long>> counters) { - this.broadConf = broadConf; + public SparkRuntimeContext(Accumulator<Map<String, Map<String, Long>>> counters) { this.counters = counters; } + public void setConf(Broadcast<byte[]> broadConf) { + this.broadConf = broadConf; + } + public void initialize(DoFn<?, ?> fn) { if (context == null) { configureLocalFiles(); - context = getTaskIOContext(broadConf, counters); + context = TaskInputOutputContextFactory.create(getConfiguration(), new TaskAttemptID(), + new SparkReporter(counters)); } fn.setContext(context); fn.initialize(); @@ -76,7 +75,6 @@ public class SparkRuntimeContext implements Serializable { List<String> allFiles = Lists.newArrayList(); for (URI uri : uris) { File f = new File(uri.getPath()); - String sparkFile = SparkFiles.get(f.getName()); allFiles.add(SparkFiles.get(f.getName())); } String sparkFiles = Joiner.on(',').join(allFiles); @@ -90,117 +88,60 @@ public class SparkRuntimeContext implements Serializable { } public Configuration getConfiguration() { - return broadConf.value(); + if (conf == null) { + conf = new Configuration(); + try { + ByteArrayInputStream bais = new ByteArrayInputStream(broadConf.value()); + conf.readFields(new DataInputStream(bais)); + bais.close(); + } catch (Exception e) { + throw new RuntimeException("Error reading broadcast configuration", e); + } + } + return conf; } - public static TaskInputOutputContext getTaskIOContext( - final Broadcast<Configuration> conf, - final Accumulator<Map<String, Long>> counters) { - ProxyFactory factory = new ProxyFactory(); - Class<TaskInputOutputContext> superType = TaskInputOutputContext.class; - Class[] types = new Class[0]; - Object[] args = new Object[0]; - final TaskAttemptID taskAttemptId = new TaskAttemptID(); - if (superType.isInterface()) { - factory.setInterfaces(new Class[] { superType }); - } else { - types = new Class[] { Configuration.class, TaskAttemptID.class, RecordWriter.class, OutputCommitter.class, - StatusReporter.class }; - args = new Object[] { conf.value(), taskAttemptId, null, null, null }; - factory.setSuperclass(superType); + private static class SparkReporter extends StatusReporter implements Serializable { + + Accumulator<Map<String, Map<String, Long>>> accum; + private transient Map<String, Map<String, Counter>> counters; + + public SparkReporter(Accumulator<Map<String, Map<String, Long>>> accum) { + this.accum = accum; + this.counters = Maps.newHashMap(); + } + + @Override + public Counter getCounter(Enum<?> anEnum) { + return getCounter(anEnum.getDeclaringClass().toString(), anEnum.name()); } - final Set<String> handledMethods = ImmutableSet.of("getConfiguration", "getCounter", - "progress", "getTaskAttemptID"); - factory.setFilter(new MethodFilter() { - @Override - public boolean isHandled(Method m) { - return handledMethods.contains(m.getName()); + @Override + public Counter getCounter(String group, String name) { + Map<String, Counter> grp = counters.get(group); + if (grp == null) { + grp = Maps.newTreeMap(); + counters.put(group, grp); } - }); - MethodHandler handler = new MethodHandler() { - @Override - public Object invoke(Object arg0, Method m, Method arg2, Object[] args) throws Throwable { - String name = m.getName(); - if ("getConfiguration".equals(name)) { - return conf.value(); - } else if ("progress".equals(name)) { - // no-op - return null; - } else if ("getTaskAttemptID".equals(name)) { - return taskAttemptId; - } else if ("getCounter".equals(name)){ // getCounter - if (args.length == 1) { - return getCounter(counters, args[0].getClass().getName(), ((Enum) args[0]).name()); - } else { - return getCounter(counters, (String) args[0], (String) args[1]); - } - } else { - throw new IllegalStateException("Unhandled method " + name); - } + if (!grp.containsKey(name)) { + grp.put(name, new SparkCounter(group, name, accum)); } - }; + return grp.get(name); + } - try { - Object newInstance = factory.create(types, args, handler); - return (TaskInputOutputContext<?, ?, ?, ?>) newInstance; - } catch (Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); + @Override + public void progress() { } - } - private static Counter getCounter(final Accumulator<Map<String, Long>> accum, final String group, - final String counterName) { - ProxyFactory factory = new ProxyFactory(); - Class<Counter> superType = Counter.class; - Class[] types = new Class[0]; - Object[] args = new Object[0]; - if (superType.isInterface()) { - factory.setInterfaces(new Class[] { superType }); - } else { - types = new Class[] { String.class, String.class }; - args = new Object[] { group, counterName }; - factory.setSuperclass(superType); + @Override + public float getProgress() { + return 0; } - final Set<String> handledMethods = ImmutableSet.of("getDisplayName", "getName", - "getValue", "increment", "setValue", "setDisplayName"); - factory.setFilter(new MethodFilter() { - @Override - public boolean isHandled(Method m) { - return handledMethods.contains(m.getName()); - } - }); - MethodHandler handler = new MethodHandler() { - @Override - public Object invoke(Object arg0, Method m, Method arg2, Object[] args) throws Throwable { - String name = m.getName(); - if ("increment".equals(name)) { - accum.add(ImmutableMap.of(group + ":" + counterName, (Long) args[0])); - return null; - } else if ("getDisplayName".equals(name)) { - return counterName; - } else if ("getName".equals(name)) { - return counterName; - } else if ("setDisplayName".equals(name)) { - // No-op - return null; - } else if ("setValue".equals(name)) { - throw new UnsupportedOperationException("Cannot set counter values in Spark, only increment them"); - } else if ("getValue".equals(name)) { - throw new UnsupportedOperationException("Cannot read counters during Spark execution"); - } else { - throw new IllegalStateException("Unhandled method " + name); - } - } - }; - try { - Object newInstance = factory.create(types, args, handler); - return (Counter) newInstance; - } catch (Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); + @Override + public void setStatus(String s) { + } } + } http://git-wip-us.apache.org/repos/asf/crunch/blob/2c9e8306/crunch-spark/src/main/java/org/apache/hadoop/mapred/SparkCounter.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/hadoop/mapred/SparkCounter.java b/crunch-spark/src/main/java/org/apache/hadoop/mapred/SparkCounter.java new file mode 100644 index 0000000..4964a55 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/hadoop/mapred/SparkCounter.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import com.google.common.collect.ImmutableMap; +import org.apache.spark.Accumulator; + +import java.util.Map; + +public class SparkCounter extends Counters.Counter { + + private String group; + private String name; + private long value = 0; + private Accumulator<Map<String, Map<String, Long>>> accum; + + public SparkCounter(String group, String name, Accumulator<Map<String, Map<String, Long>>> accum) { + this.group = group; + this.name = name; + this.accum = accum; + } + + public SparkCounter(String group, String name, long value) { + this.group = group; + this.name = name; + this.value = value; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getDisplayName() { + return name; + } + + @Override + public long getValue() { + return value; + } + + @Override + public long getCounter() { + return getValue(); + } + + @Override + public void increment(long inc) { + this.value += inc; + accum.add(ImmutableMap.<String, Map<String, Long>>of(group, ImmutableMap.of(name, inc))); + } + + @Override + public void setValue(long newValue) { + long delta = newValue - value; + accum.add(ImmutableMap.<String, Map<String, Long>>of(group, ImmutableMap.of(name, delta))); + this.value = newValue; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/2c9e8306/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5e9516c..aafe113 100644 --- a/pom.xml +++ b/pom.xml @@ -91,11 +91,10 @@ under the License. <hbase.version>0.96.0-hadoop1</hbase.version> <hbase.midfix>hadoop1</hbase.midfix> - <!-- Can be overridden by the scala-2.10 profile, but these are the default values --> - <scala.base.version>2.9.3</scala.base.version> - <scala.version>2.9.3</scala.version> + <scala.base.version>2.10</scala.base.version> + <scala.version>2.10.4</scala.version> <scalatest.version>1.9.1</scalatest.version> - <spark.version>0.8.1-incubating</spark.version> + <spark.version>0.9.1</spark.version> </properties> <scm> @@ -498,15 +497,6 @@ under the License. </dependencies> </dependencyManagement> </profile> - <profile> - <id>scala-2.10</id> - <properties> - <scala.base.version>2.10</scala.base.version> - <scala.version>2.10.3</scala.version> - <scalatest.version>2.1.0</scalatest.version> - <spark.version>0.9.0-incubating</spark.version> - </properties> - </profile> </profiles> <build>
