Repository: crunch Updated Branches: refs/heads/master 61f98eea9 -> d0bb205ea
CRUNCH-520: Coverity scan inspection fixes Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d0bb205e Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d0bb205e Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d0bb205e Branch: refs/heads/master Commit: d0bb205eaf236bf1d81b9070a2db3428cb90a161 Parents: 61f98ee Author: Sean Owen <[email protected]> Authored: Wed May 20 09:13:24 2015 +0100 Committer: Josh Wills <[email protected]> Committed: Wed May 20 09:31:30 2015 -0700 ---------------------------------------------------------------------- .../contrib/bloomfilter/BloomFiltersIT.java | 7 +++-- .../java/org/apache/crunch/ConfigurationIT.java | 2 +- .../it/java/org/apache/crunch/MapPObjectIT.java | 6 ++-- .../java/org/apache/crunch/MaterializeIT.java | 6 ++-- .../org/apache/crunch/PTableKeyValueIT.java | 3 +- .../org/apache/crunch/PipelineCallableIT.java | 4 +-- .../crunch/SingleUseIterableExceptionIT.java | 4 +-- .../apache/crunch/StageResultsCountersIT.java | 2 +- .../crunch/impl/mem/MemPipelineUTF8IT.java | 6 +--- .../java/org/apache/crunch/io/ToolRunnerIT.java | 4 ++- .../java/org/apache/crunch/lib/AggregateIT.java | 5 +++ .../crunch/lib/join/MapsideJoinStrategyIT.java | 5 +-- .../java/org/apache/crunch/fn/Aggregators.java | 3 +- .../lib/jobcontrol/CrunchJobControl.java | 32 ++++++++++++-------- .../apache/crunch/impl/mem/CountersWrapper.java | 3 +- .../org/apache/crunch/io/CrunchOutputs.java | 9 ++---- .../java/org/apache/crunch/lib/Quantiles.java | 4 +-- .../crunch/types/writable/TupleWritable.java | 6 ++-- .../writable/WritableGroupedTableType.java | 2 +- .../org/apache/crunch/util/PartitionUtils.java | 2 ++ .../lib/jobcontrol/CrunchJobControlTest.java | 2 +- .../crunch/impl/SingleUseIterableTest.java | 8 ++--- .../writable/GenericArrayWritableTest.java | 17 +++++++---- .../crunch/types/writable/WritablesTest.java | 5 +++ .../crunch/examples/WordAggregationHBase.java | 16 ++++++---- .../apache/crunch/io/hbase/HFileTargetIT.java | 8 +++-- .../crunch/io/hbase/WordCountHBaseIT.java | 24 +++++++++------ .../org/apache/crunch/io/hbase/HBaseData.java | 5 +-- .../org/apache/crunch/io/orc/OrcWritable.java | 5 +++ .../apache/crunch/SparkPipelineCallableIT.java | 4 +-- .../apache/crunch/impl/spark/SparkRuntime.java | 4 +-- 31 files changed, 121 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java b/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java index d91e07f..c18c8c4 100644 --- a/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java +++ b/crunch-contrib/src/it/java/org/apache/crunch/contrib/bloomfilter/BloomFiltersIT.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.io.Serializable; +import java.nio.charset.Charset; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; @@ -46,7 +47,7 @@ public class BloomFiltersIT extends CrunchTestSupport implements Serializable { List<String> parts = Arrays.asList(StringUtils.split(input, " ")); Collection<Key> keys = new HashSet<Key>(); for (String stringpart : parts) { - keys.add(new Key(stringpart.getBytes())); + keys.add(new Key(stringpart.getBytes(Charset.forName("UTF-8")))); } return keys; } @@ -54,8 +55,8 @@ public class BloomFiltersIT extends CrunchTestSupport implements Serializable { Map<String, BloomFilter> filterValues = BloomFilterFactory.createFilter(new Path(inputPath), filterFn).getValue(); assertEquals(1, filterValues.size()); BloomFilter filter = filterValues.get("shakes.txt"); - assertTrue(filter.membershipTest(new Key("Mcbeth".getBytes()))); - assertTrue(filter.membershipTest(new Key("apples".getBytes()))); + assertTrue(filter.membershipTest(new Key("Mcbeth".getBytes(Charset.forName("UTF-8"))))); + assertTrue(filter.membershipTest(new Key("apples".getBytes(Charset.forName("UTF-8"))))); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java b/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java index 0f65d8f..52ba2b6 100644 --- a/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/ConfigurationIT.java @@ -19,13 +19,13 @@ */ package org.apache.crunch; -import junit.framework.Assert; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.From; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java b/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java index c48284f..635efdf 100644 --- a/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/MapPObjectIT.java @@ -17,7 +17,7 @@ */ package org.apache.crunch; -import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.Map; @@ -39,8 +39,8 @@ public class MapPObjectIT { Pair.of(2, "c"), Pair.of(3, "e")); public void assertMatches(Map<Integer, String> m) { - for (Integer k : m.keySet()) { - assertEquals(kvPairs.get(k).second(), m.get(k)); + for (Map.Entry<Integer, String> e : m.entrySet()) { + assertEquals(kvPairs.get(e.getKey()).second(), e.getValue()); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java index 7bc61df..455b943 100644 --- a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java @@ -17,9 +17,6 @@ */ package org.apache.crunch; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertTrue; - import java.io.IOException; import java.util.List; @@ -42,6 +39,9 @@ import org.junit.Test; import com.google.common.collect.Lists; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class MaterializeIT { @Rule http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java b/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java index d56e122..a8a387b 100644 --- a/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/PTableKeyValueIT.java @@ -23,8 +23,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import junit.framework.Assert; - import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.At; import org.apache.crunch.test.TemporaryPath; @@ -32,6 +30,7 @@ import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroTypeFamily; import org.apache.crunch.types.writable.WritableTypeFamily; +import org.junit.Assert; import org.junit.After; import org.junit.Before; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java b/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java index b4fc19e..95638a1 100644 --- a/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java @@ -26,8 +26,8 @@ import org.junit.Test; import java.util.Map; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertFalse; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; public class PipelineCallableIT { @Rule http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java b/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java index ff2897b..8d070cd 100644 --- a/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/SingleUseIterableExceptionIT.java @@ -17,8 +17,6 @@ */ package org.apache.crunch; -import java.util.Iterator; - import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.To; import org.apache.crunch.test.TemporaryPath; @@ -35,7 +33,7 @@ public class SingleUseIterableExceptionIT { static class ReduceFn extends MapFn<Iterable<String>, String> { @Override public String map(Iterable<String> input) { - Iterator<String> iter = input.iterator(); + input.iterator(); throw new CrunchRuntimeException("Exception"); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java b/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java index 04711e4..e74c166 100644 --- a/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/StageResultsCountersIT.java @@ -17,9 +17,9 @@ */ package org.apache.crunch; -import static junit.framework.Assert.assertTrue; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import java.util.HashSet; import java.util.List; http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java index 56b167a..3e74cdd 100644 --- a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java +++ b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java @@ -18,15 +18,10 @@ package org.apache.crunch.impl.mem; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.nio.charset.Charset; import com.google.common.base.Charsets; import com.google.common.io.Files; -import junit.framework.Assert; import org.apache.crunch.PCollection; import org.apache.crunch.Pipeline; @@ -36,6 +31,7 @@ import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.text.TextFileTarget; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java b/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java index 287ba93..57bb5fa 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/ToolRunnerIT.java @@ -17,6 +17,8 @@ */ package org.apache.crunch.io; +import java.nio.charset.Charset; + import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.Pair; @@ -205,7 +207,7 @@ public class ToolRunnerIT { @Override public Pair<BytesWritable, BytesWritable> map(String input) { - BytesWritable bw = new BytesWritable(input.getBytes()); + BytesWritable bw = new BytesWritable(input.getBytes(Charset.forName("UTF-8"))); return Pair.of(bw, bw); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java index 1408c73..5675de8 100644 --- a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java @@ -248,5 +248,10 @@ public class AggregateIT { return true; } + @Override + public int hashCode() { + return value == null ? 0 : value.hashCode(); + } + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java index f9caa3a..1917038 100644 --- a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.io.OutputStream; +import java.nio.charset.Charset; import java.util.Collections; import java.util.List; import java.util.Locale; @@ -159,7 +160,7 @@ public class MapsideJoinStrategyIT { public void testLegacyMapsideJoin_LeftSideIsEmpty() throws IOException { MRPipeline pipeline = new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()); PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt"); - PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); + readTable(pipeline, "orders.txt"); PTable<Integer, String> filteredCustomerTable = customerTable .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), customerTable.getPTableType()); @@ -235,7 +236,7 @@ public class MapsideJoinStrategyIT { OutputStream out2 = fs.create(path2, true); for(int i = 0; i < 4; i++){ - byte[] value = ("value" + i + "\n").getBytes(); + byte[] value = ("value" + i + "\n").getBytes(Charset.forName("UTF-8")); out1.write(value); out2.write(value); } http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java index 5a9c157..62ee089 100644 --- a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java +++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java @@ -1056,7 +1056,8 @@ public final class Aggregators { @Override public void update(final String next) { long length = (next == null) ? 0 : next.length() + separator.length(); - if (maxOutputLength > 0 && currentLength + length > maxOutputLength || maxInputLength > 0 && next.length() > maxInputLength) { + if ((maxOutputLength > 0 && currentLength + length > maxOutputLength) || + (maxInputLength > 0 && next != null && next.length() > maxInputLength)) { return; } if (maxOutputLength > 0) { http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java index aac6296..62147ad 100644 --- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -156,17 +155,26 @@ public class CrunchJobControl { } private Map<Integer, CrunchControlledJob> getQueue(State state) { - Map<Integer, CrunchControlledJob> retv = null; - if (state == State.WAITING) { - retv = this.waitingJobs; - } else if (state == State.READY) { - retv = this.readyJobs; - } else if (state == State.RUNNING) { - retv = this.runningJobs; - } else if (state == State.SUCCESS) { - retv = this.successfulJobs; - } else if (state == State.FAILED || state == State.DEPENDENT_FAILED) { - retv = this.failedJobs; + Map<Integer, CrunchControlledJob> retv; + switch (state) { + case WAITING: + retv = this.waitingJobs; + break; + case READY: + retv = this.readyJobs; + break; + case RUNNING: + retv = this.runningJobs; + break; + case SUCCESS: + retv = this.successfulJobs; + break; + case FAILED: + case DEPENDENT_FAILED: + retv = this.failedJobs; + break; + default: + throw new IllegalArgumentException("Unknown state " + state); } return retv; } http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java index ee0906b..7312402 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java @@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.Counters; -import javax.annotation.Nullable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -87,7 +86,7 @@ class CountersWrapper extends Counters { public Iterator<CounterGroup> iterator() { return Iterators.concat(Iterables.transform(allCounters, new Function<Counters, Iterator<CounterGroup>>() { @Override - public Iterator<CounterGroup> apply(@Nullable Counters input) { + public Iterator<CounterGroup> apply(Counters input) { return input.iterator(); } }).iterator()); http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java index 0d06931..247ac08 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java @@ -195,13 +195,8 @@ public class CrunchOutputs<K, V> { job = getJob(job.getJobID(), namedOutput,baseConf); OutputFormat<K, V> fmt = getOutputFormat(namedOutput, job, namedOutputs.get(namedOutput)); - TaskAttemptContext taskContext = null; - RecordWriter<K, V> recordWriter = null; - - if (baseContext != null) { - taskContext = getTaskContext(baseContext, job); - recordWriter = fmt.getRecordWriter(taskContext); - } + TaskAttemptContext taskContext = getTaskContext(baseContext, job); + RecordWriter<K, V> recordWriter = fmt.getRecordWriter(taskContext); OutputState<K, V> outputState = new OutputState(taskContext, recordWriter); this.outputStates.put(namedOutput, outputState); return outputState; http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java b/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java index d6fc454..4262c58 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java @@ -30,8 +30,6 @@ import org.apache.crunch.Pair; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; -import javax.annotation.Nullable; - import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -173,7 +171,7 @@ public class Quantiles { Iterator<V> valueIterator = Iterators.transform(iterator, new Function<Pair<V, Long>, V>() { @Override - public V apply(@Nullable Pair<V, Long> input) { + public V apply(Pair<V, Long> input) { return input.first(); } }); http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java index bdd3ad9..068b0af 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/TupleWritable.java @@ -334,8 +334,10 @@ public class TupleWritable extends Configured implements WritableComparable<Tupl int cmp = WritableComparator.get(clazz.asSubclass(WritableComparable.class)).compare( buffer1.getData(), buffer1.getPosition(), bodySize1, buffer2.getData(), buffer2.getPosition(), bodySize2); - buffer1.skip(bodySize1); - buffer2.skip(bodySize2); + long skipped1 = buffer1.skip(bodySize1); + long skipped2 = buffer2.skip(bodySize2); + Preconditions.checkState(skipped1 == bodySize1); + Preconditions.checkState(skipped2 == bodySize2); return cmp; } else { // fallback to deserialization http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java index c25345b..c251905 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java @@ -94,7 +94,7 @@ class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> { WritableType valueType = (WritableType) tableType.getValueType(); job.setMapOutputKeyClass(keyType.getSerializationClass()); job.setMapOutputValueClass(valueType.getSerializationClass()); - if (options.getSortComparatorClass() == null && + if ((options == null || options.getSortComparatorClass() == null) && TupleWritable.class.equals(keyType.getSerializationClass())) { job.setSortComparatorClass(TupleWritable.Comparator.class); } http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java index cdcc401..fbd4ebd 100644 --- a/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java +++ b/crunch-core/src/main/java/org/apache/crunch/util/PartitionUtils.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.util; +import com.google.common.base.Preconditions; import org.apache.crunch.PCollection; import org.apache.hadoop.conf.Configuration; @@ -42,6 +43,7 @@ public class PartitionUtils { public static <T> int getRecommendedPartitions(PCollection<T> pcollection, Configuration conf) { long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK); + Preconditions.checkArgument(bytesPerTask > 0); int recommended = 1 + (int) (pcollection.getSize() / bytesPerTask); int maxRecommended = conf.getInt(MAX_REDUCERS, DEFAULT_MAX_REDUCERS); if (maxRecommended > 0 && recommended > maxRecommended) { http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java index e727ec1..fa226b4 100644 --- a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java @@ -72,7 +72,7 @@ public class CrunchJobControlTest { verify(job3).submit(); } - private class IncrementingPipelineCallable extends PipelineCallable<Void> { + private static class IncrementingPipelineCallable extends PipelineCallable<Void> { private String name; private boolean executed; http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java index 811a0a3..d1e530a 100644 --- a/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/impl/SingleUseIterableTest.java @@ -44,11 +44,9 @@ public class SingleUseIterableTest { SingleUseIterable<Integer> iterable = new SingleUseIterable<Integer>(values); - List<Integer> retrievedValues = Lists.newArrayList(iterable); - - for (Integer n : iterable) { - - } + // Consume twice + Lists.newArrayList(iterable); + Lists.newArrayList(iterable); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java index c446a69..481086b 100644 --- a/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java @@ -23,12 +23,11 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; import static org.junit.Assert.assertThat; +import java.nio.charset.Charset; import java.util.Arrays; import org.apache.crunch.test.Tests; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.junit.Test; @@ -47,27 +46,33 @@ public class GenericArrayWritableTest { @Test public void testNonEmpty() { GenericArrayWritable src = new GenericArrayWritable(); - src.set(new BytesWritable[] { new BytesWritable("foo".getBytes()), new BytesWritable("bar".getBytes()) }); + src.set(new BytesWritable[] { + new BytesWritable("foo".getBytes(Charset.forName("UTF-8"))), + new BytesWritable("bar".getBytes(Charset.forName("UTF-8"))) }); GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable()); assertThat(src.get(), not(sameInstance(dest.get()))); assertThat(dest.get().length, is(2)); assertThat(Arrays.asList(dest.get()), - hasItems(new BytesWritable("foo".getBytes()), new BytesWritable("bar".getBytes()))); + hasItems(new BytesWritable("foo".getBytes(Charset.forName("UTF-8"))), + new BytesWritable("bar".getBytes(Charset.forName("UTF-8"))))); } @Test public void testNulls() { GenericArrayWritable src = new GenericArrayWritable(); - src.set(new BytesWritable[] { new BytesWritable("a".getBytes()), null, new BytesWritable("b".getBytes()) }); + src.set(new BytesWritable[] { + new BytesWritable("a".getBytes(Charset.forName("UTF-8"))), null, + new BytesWritable("b".getBytes(Charset.forName("UTF-8"))) }); GenericArrayWritable dest = Tests.roundtrip(src, new GenericArrayWritable()); assertThat(src.get(), not(sameInstance(dest.get()))); assertThat(dest.get().length, is(3)); assertThat(Arrays.asList(dest.get()), - hasItems(new BytesWritable("a".getBytes()), new BytesWritable("b".getBytes()), null)); + hasItems(new BytesWritable("a".getBytes(Charset.forName("UTF-8"))), + new BytesWritable("b".getBytes(Charset.forName("UTF-8"))), null)); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java index 2281473..9af3dea 100644 --- a/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/types/writable/WritablesTest.java @@ -208,6 +208,11 @@ public class WritablesTest { } @Override + public int hashCode() { + return (left == null ? 0 : left.hashCode()) ^ right; + } + + @Override public int compareTo(TestWritable o) { int cmp = left.compareTo(o.left); if (cmp != 0) http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java ---------------------------------------------------------------------- diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java b/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java index b2d24f8..5d62d19 100644 --- a/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java +++ b/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java @@ -168,13 +168,17 @@ public class WordAggregationHBase extends Configured implements Tool, Serializab private static void createTable(Configuration conf, String htableName, String... families) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { HBaseAdmin hbase = new HBaseAdmin(conf); - if (!hbase.tableExists(htableName)) { - HTableDescriptor desc = new HTableDescriptor(htableName); - for (String s : families) { - HColumnDescriptor meta = new HColumnDescriptor(s); - desc.addFamily(meta); + try { + if (!hbase.tableExists(htableName)) { + HTableDescriptor desc = new HTableDescriptor(htableName); + for (String s : families) { + HColumnDescriptor meta = new HColumnDescriptor(s); + desc.addFamily(meta); + } + hbase.createTable(desc); } - hbase.createTable(desc); + } finally { + hbase.close(); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java index 7d8ae83..71cf31f 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java @@ -34,7 +34,6 @@ import org.apache.crunch.PipelineResult; import org.apache.crunch.fn.FilterFns; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.At; -import org.apache.crunch.lib.Sort; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.writable.Writables; @@ -72,6 +71,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.Serializable; +import java.nio.charset.Charset; import java.util.List; import java.util.Map; import java.util.Random; @@ -112,7 +112,7 @@ public class HFileTargetIT implements Serializable { // probably created using this process' umask. So we guess the temp dir permissions as // 0777 & ~umask, and use that to set the config value. Process process = Runtime.getRuntime().exec("/bin/sh -c umask"); - BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream())); + BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream(), Charset.forName("UTF-8"))); int rc = process.waitFor(); if(rc == 0) { String umask = br.readLine(); @@ -282,7 +282,9 @@ public class HFileTargetIT implements Serializable { reader = HFile.createReader(fs, f, new CacheConfig(conf), conf); assertEquals(DataBlockEncoding.PREFIX, reader.getDataBlockEncoding()); } finally { - reader.close(); + if (reader != null) { + reader.close(); + } } hfilesCount++; } http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java index de7b287..dd48352 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.nio.charset.Charset; import java.util.Map; import java.util.Random; @@ -40,7 +41,6 @@ import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; @@ -65,7 +65,8 @@ public class WordCountHBaseIT { byte[] firstStrBytes = input.second().first().getValue(WORD_COLFAM, null); byte[] secondStrBytes = input.second().second().getValue(WORD_COLFAM, null); if (firstStrBytes != null && secondStrBytes != null) { - return Joiner.on(',').join(new String(firstStrBytes), new String(secondStrBytes)); + return Joiner.on(',').join(new String(firstStrBytes, Charset.forName("UTF-8")), + new String(secondStrBytes, Charset.forName("UTF-8"))); } return ""; } @@ -137,7 +138,7 @@ public class WordCountHBaseIT { public void run(Pipeline pipeline) throws Exception { Random rand = new Random(); - int postFix = Math.abs(rand.nextInt()); + int postFix = rand.nextInt() & 0x7FFFFFFF; String inputTableName = "crunch_words_" + postFix; String outputTableName = "crunch_counts_" + postFix; String otherTableName = "crunch_other_" + postFix; @@ -180,13 +181,16 @@ public class WordCountHBaseIT { // verify we can do joins. HTable joinTable = hbaseTestUtil.createTable(Bytes.toBytes(joinTableName), WORD_COLFAM); - - key = 0; - key = put(joinTable, key, "zebra"); - key = put(joinTable, key, "donkey"); - key = put(joinTable, key, "bird"); - key = put(joinTable, key, "horse"); - joinTable.flushCommits(); + try { + key = 0; + key = put(joinTable, key, "zebra"); + key = put(joinTable, key, "donkey"); + key = put(joinTable, key, "bird"); + key = put(joinTable, key, "horse"); + joinTable.flushCommits(); + } finally { + joinTable.close(); + } Scan joinScan = new Scan(); joinScan.addFamily(WORD_COLFAM); http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java index 84de288..4a721f3 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java @@ -66,8 +66,9 @@ public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Resu HTable htable = new HTable(hconf, table); String[] scanStrings = StringUtils.getStrings(scansAsString); - Scan[] scans = new Scan[scanStrings.length]; - for(int i = 0; i < scanStrings.length; i++){ + int length = scanStrings == null ? 0 : scanStrings.length; + Scan[] scans = new Scan[length]; + for(int i = 0; i < length; i++){ scans[i] = HBaseSourceTarget.convertStringToScan(scanStrings[i]); } http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java index 883d0f0..716b291 100644 --- a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java +++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java @@ -87,6 +87,11 @@ public class OrcWritable implements WritableComparable<OrcWritable> { return compareTo((OrcWritable) obj) == 0; } + @Override + public int hashCode() { + return blob == null ? 0 : blob.hashCode(); + } + public void setSerde(BinarySortableSerDe serde) { this.serde = serde; } http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java index 51b65af..d799842 100644 --- a/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java +++ b/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java @@ -24,8 +24,8 @@ import org.junit.Test; import java.util.Map; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertFalse; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; public class SparkPipelineCallableIT extends CrunchTestSupport { @Test http://git-wip-us.apache.org/repos/asf/crunch/blob/d0bb205e/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java index 4c0cb27..5798e4c 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java @@ -339,7 +339,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe job.getOutputFormatClass(), job.getConfiguration()); pt.handleOutputs(job.getConfiguration(), tmpPath, -1); - } else if (t instanceof MapReduceTarget) { + } else { //if (t instanceof MapReduceTarget) { MapReduceTarget mrt = (MapReduceTarget) t; mrt.configureForMapReduce(job, ptype, new Path("/tmp"), "out0"); CrunchOutputs.OutputConfig outConfig = @@ -348,8 +348,6 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe job.setOutputKeyClass(outConfig.keyClass); job.setOutputValueClass(outConfig.valueClass); outRDD.saveAsHadoopDataset(new JobConf(job.getConfiguration())); - } else { - throw new IllegalArgumentException("Spark execution cannot handle non-MapReduceTarget: " + t); } } catch (Exception et) { LOG.error("Spark Exception", et);
