Add hadoop progressable compatibility. Patch by Ben Coverston, reviewed by brandonwilliams for CASSANDRA-5201
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/24923083 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/24923083 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/24923083 Branch: refs/heads/cassandra-2.1 Commit: 249230834c2ce1ac169b2b3228d5d222f5ecacc2 Parents: ab2717b Author: Brandon Williams <[email protected]> Authored: Wed Mar 5 11:21:35 2014 -0600 Committer: Brandon Williams <[email protected]> Committed: Wed Mar 5 11:21:35 2014 -0600 ---------------------------------------------------------------------- build.xml | 3 - .../hadoop/AbstractColumnFamilyInputFormat.java | 1 - .../AbstractColumnFamilyOutputFormat.java | 1 - .../AbstractColumnFamilyRecordWriter.java | 2 + .../cassandra/hadoop/BulkOutputFormat.java | 3 +- .../cassandra/hadoop/BulkRecordWriter.java | 16 +- .../hadoop/ColumnFamilyInputFormat.java | 1 - .../hadoop/ColumnFamilyOutputFormat.java | 2 +- .../hadoop/ColumnFamilyRecordReader.java | 1 - .../hadoop/ColumnFamilyRecordWriter.java | 15 +- .../apache/cassandra/hadoop/HadoopCompat.java | 309 +++++++++++++++++++ .../apache/cassandra/hadoop/Progressable.java | 50 --- .../cassandra/hadoop/cql3/CqlOutputFormat.java | 3 +- .../hadoop/cql3/CqlPagingInputFormat.java | 2 +- .../hadoop/cql3/CqlPagingRecordReader.java | 2 +- .../cassandra/hadoop/cql3/CqlRecordWriter.java | 12 +- .../cassandra/hadoop/pig/CassandraStorage.java | 2 +- .../apache/cassandra/hadoop/pig/CqlStorage.java | 3 +- 18 files changed, 346 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 77b2639..9972aa2 100644 --- a/build.xml +++ b/build.xml @@ -367,7 +367,6 @@ </dependency> <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="1.0.3"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" version="1.0.3"/> - <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat" version="4.3"/> <dependency groupId="org.apache.pig" artifactId="pig" version="0.10.0"/> <dependency groupId="net.java.dev.jna" artifactId="jna" version="3.2.7"/> @@ -410,7 +409,6 @@ <dependency groupId="org.apache.rat" artifactId="apache-rat"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/> - <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat"/> <dependency groupId="org.apache.pig" artifactId="pig"/> <dependency groupId="net.java.dev.jna" artifactId="jna"/> @@ -474,7 +472,6 @@ <!-- don't need hadoop classes to run, but if you use the hadoop stuff --> <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/> - <dependency groupId="com.twitter.elephantbird" artifactId="elephant-bird-hadoop-compat" optional="true"/> <dependency groupId="org.apache.pig" artifactId="pig" optional="true"/> <!-- don't need jna to run, but nice to have --> http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java index f547fd0..ba79eee 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java @@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.twitter.elephantbird.util.HadoopCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java index a3c4234..3041829 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import com.twitter.elephantbird.util.HadoopCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java index 1956262..501ca65 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.thrift.transport.TTransport; +import org.apache.hadoop.util.Progressable; /** @@ -67,6 +68,7 @@ public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWrite protected final ConsistencyLevel consistencyLevel; protected Progressable progressable; + protected TaskAttemptContext context; protected AbstractColumnFamilyRecordWriter(Configuration conf) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java index 566d5ee..c3d8e05 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; -import com.twitter.elephantbird.util.HadoopCompat; import org.apache.cassandra.thrift.Mutation; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.*; @@ -61,7 +60,7 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>> @Deprecated public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException { - return new BulkRecordWriter(job, new Progressable(progress)); + return new BulkRecordWriter(job, progress); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java index 28c6d2c..8bfc958 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java @@ -28,7 +28,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import com.twitter.elephantbird.util.HadoopCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +52,7 @@ import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.apache.hadoop.util.Progressable; final class BulkRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>> implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> @@ -67,6 +67,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> private SSTableLoader loader; private File outputdir; private Progressable progress; + private TaskAttemptContext context; private int maxFailures; private enum CFType @@ -87,10 +88,9 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> BulkRecordWriter(TaskAttemptContext context) { this(HadoopCompat.getConfiguration(context)); - this.progress = new Progressable(context); + this.context = context; } - BulkRecordWriter(Configuration conf, Progressable progress) { this(conf); @@ -205,7 +205,10 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> writer.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000)); } } - progress.progress(); + if (null != progress) + progress.progress(); + if (null != context) + HadoopCompat.progress(context); } } @Override @@ -236,7 +239,10 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> } catch (ExecutionException | TimeoutException te) { - progress.progress(); + if (null != progress) + progress.progress(); + if (null != context) + HadoopCompat.progress(context); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java index df59408..6dd90f6 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import com.twitter.elephantbird.util.HadoopCompat; import org.apache.cassandra.db.Column; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java index 724ba7d..49aaf99 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java @@ -51,7 +51,7 @@ public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<B @Deprecated public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) { - return new ColumnFamilyRecordWriter(job, new Progressable(progress)); + return new ColumnFamilyRecordWriter(job, progress); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index 22d614e..0bc1c49 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.*; -import com.twitter.elephantbird.util.HadoopCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java index 0ae2a67..d6a873b 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java @@ -23,7 +23,6 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; -import com.twitter.elephantbird.util.HadoopCompat; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.thrift.*; @@ -31,6 +30,7 @@ import org.apache.cassandra.utils.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.thrift.TException; +import org.apache.hadoop.util.Progressable; /** @@ -62,9 +62,9 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By ColumnFamilyRecordWriter(TaskAttemptContext context) { this(HadoopCompat.getConfiguration(context)); - this.progressable = new Progressable(context); - } + this.context = context; + } ColumnFamilyRecordWriter(Configuration conf, Progressable progressable) { this(conf); @@ -128,7 +128,10 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By for (Mutation amut : value) client.put(Pair.create(keybuff, amut)); + if (progressable != null) progressable.progress(); + if (context != null) + HadoopCompat.progress(context); } /** @@ -140,9 +143,9 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf); /** - * Constructs an {@link RangeClient} for the given endpoints. - * @param endpoints the possible endpoints to execute the mutations on - */ + * Constructs an {@link RangeClient} for the given endpoints. + * @param endpoints the possible endpoints to execute the mutations on + */ public RangeClient(List<InetAddress> endpoints) { super(endpoints); http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/HadoopCompat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/HadoopCompat.java b/src/java/org/apache/cassandra/hadoop/HadoopCompat.java new file mode 100644 index 0000000..f2f7033 --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/HadoopCompat.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.hadoop; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.StatusReporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +/* + * This is based on ContextFactory.java from hadoop-2.0.x sources. + */ + +/** + * Utility methods to allow applications to deal with inconsistencies between + * MapReduce Context Objects API between Hadoop 1.x and 2.x. + */ +public class HadoopCompat { + + private static final boolean useV21; + + private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR; + private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR; + private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR; + private static final Constructor<?> GENERIC_COUNTER_CONSTRUCTOR; + + private static final Field READER_FIELD; + private static final Field WRITER_FIELD; + + private static final Method GET_CONFIGURATION_METHOD; + private static final Method SET_STATUS_METHOD; + private static final Method GET_COUNTER_METHOD; + private static final Method INCREMENT_COUNTER_METHOD; + private static final Method GET_TASK_ATTEMPT_ID; + private static final Method PROGRESS_METHOD; + + static { + boolean v21 = true; + final String PACKAGE = "org.apache.hadoop.mapreduce"; + try { + Class.forName(PACKAGE + ".task.JobContextImpl"); + } catch (ClassNotFoundException cnfe) { + v21 = false; + } + useV21 = v21; + Class<?> jobContextCls; + Class<?> taskContextCls; + Class<?> taskIOContextCls; + Class<?> mapContextCls; + Class<?> genericCounterCls; + try { + if (v21) { + jobContextCls = + Class.forName(PACKAGE+".task.JobContextImpl"); + taskContextCls = + Class.forName(PACKAGE+".task.TaskAttemptContextImpl"); + taskIOContextCls = + Class.forName(PACKAGE+".task.TaskInputOutputContextImpl"); + mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl"); + genericCounterCls = Class.forName(PACKAGE+".counters.GenericCounter"); + } else { + jobContextCls = + Class.forName(PACKAGE+".JobContext"); + taskContextCls = + Class.forName(PACKAGE+".TaskAttemptContext"); + taskIOContextCls = + Class.forName(PACKAGE+".TaskInputOutputContext"); + mapContextCls = Class.forName(PACKAGE + ".MapContext"); + genericCounterCls = + Class.forName("org.apache.hadoop.mapred.Counters$Counter"); + + } + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Can't find class", e); + } + try { + JOB_CONTEXT_CONSTRUCTOR = + jobContextCls.getConstructor(Configuration.class, JobID.class); + JOB_CONTEXT_CONSTRUCTOR.setAccessible(true); + TASK_CONTEXT_CONSTRUCTOR = + taskContextCls.getConstructor(Configuration.class, + TaskAttemptID.class); + TASK_CONTEXT_CONSTRUCTOR.setAccessible(true); + GENERIC_COUNTER_CONSTRUCTOR = + genericCounterCls.getDeclaredConstructor(String.class, + String.class, + Long.TYPE); + GENERIC_COUNTER_CONSTRUCTOR.setAccessible(true); + + if (useV21) { + MAP_CONTEXT_CONSTRUCTOR = + mapContextCls.getDeclaredConstructor(Configuration.class, + TaskAttemptID.class, + RecordReader.class, + RecordWriter.class, + OutputCommitter.class, + StatusReporter.class, + InputSplit.class); + Method get_counter; + try { + get_counter = Class.forName(PACKAGE + ".TaskAttemptContext").getMethod("getCounter", String.class, + String.class); + } catch (Exception e) { + get_counter = Class.forName(PACKAGE + ".TaskInputOutputContext").getMethod("getCounter", + String.class, String.class); + } + GET_COUNTER_METHOD = get_counter; + } else { + MAP_CONTEXT_CONSTRUCTOR = + mapContextCls.getConstructor(Configuration.class, + TaskAttemptID.class, + RecordReader.class, + RecordWriter.class, + OutputCommitter.class, + StatusReporter.class, + InputSplit.class); + GET_COUNTER_METHOD = Class.forName(PACKAGE+".TaskInputOutputContext") + .getMethod("getCounter", String.class, String.class); + } + MAP_CONTEXT_CONSTRUCTOR.setAccessible(true); + READER_FIELD = mapContextCls.getDeclaredField("reader"); + READER_FIELD.setAccessible(true); + WRITER_FIELD = taskIOContextCls.getDeclaredField("output"); + WRITER_FIELD.setAccessible(true); + GET_CONFIGURATION_METHOD = Class.forName(PACKAGE+".JobContext") + .getMethod("getConfiguration"); + SET_STATUS_METHOD = Class.forName(PACKAGE+".TaskAttemptContext") + .getMethod("setStatus", String.class); + GET_TASK_ATTEMPT_ID = Class.forName(PACKAGE+".TaskAttemptContext") + .getMethod("getTaskAttemptID"); + INCREMENT_COUNTER_METHOD = Class.forName(PACKAGE+".Counter") + .getMethod("increment", Long.TYPE); + PROGRESS_METHOD = Class.forName(PACKAGE+".TaskAttemptContext") + .getMethod("progress"); + + } catch (SecurityException e) { + throw new IllegalArgumentException("Can't run constructor ", e); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("Can't find constructor ", e); + } catch (NoSuchFieldException e) { + throw new IllegalArgumentException("Can't find field ", e); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Can't find class", e); + } + } + + /** + * True if runtime Hadoop version is 2.x, false otherwise. + */ + public static boolean isVersion2x() { + return useV21; + } + + private static Object newInstance(Constructor<?> constructor, Object...args) { + try { + return constructor.newInstance(args); + } catch (InstantiationException e) { + throw new IllegalArgumentException("Can't instantiate " + constructor, e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't instantiate " + constructor, e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't instantiate " + constructor, e); + } + } + + /** + * Creates JobContext from a JobConf and jobId using the correct constructor + * for based on Hadoop version. <code>jobId</code> could be null. + */ + public static JobContext newJobContext(Configuration conf, JobID jobId) { + return (JobContext) newInstance(JOB_CONTEXT_CONSTRUCTOR, conf, jobId); + } + + /** + * Creates TaskAttempContext from a JobConf and jobId using the correct + * constructor for based on Hadoop version. + */ + public static TaskAttemptContext newTaskAttemptContext( + Configuration conf, TaskAttemptID taskAttemptId) { + return (TaskAttemptContext) + newInstance(TASK_CONTEXT_CONSTRUCTOR, conf, taskAttemptId); + } + + /** + * Instantiates MapContext under Hadoop 1 and MapContextImpl under Hadoop 2. + */ + public static MapContext newMapContext(Configuration conf, + TaskAttemptID taskAttemptID, + RecordReader recordReader, + RecordWriter recordWriter, + OutputCommitter outputCommitter, + StatusReporter statusReporter, + InputSplit inputSplit) { + return (MapContext) newInstance(MAP_CONTEXT_CONSTRUCTOR, + conf, taskAttemptID, recordReader, recordWriter, outputCommitter, + statusReporter, inputSplit); + } + + /** + * @return with Hadoop 2 : <code>new GenericCounter(args)</code>,<br> + * with Hadoop 1 : <code>new Counter(args)</code> + */ + public static Counter newGenericCounter(String name, String displayName, long value) { + try { + return (Counter) + GENERIC_COUNTER_CONSTRUCTOR.newInstance(name, displayName, value); + } catch (InstantiationException e) { + throw new IllegalArgumentException("Can't instantiate Counter", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't instantiate Counter", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't instantiate Counter", e); + } + } + + /** + * Invokes a method and rethrows any exception as runtime excetpions. + */ + private static Object invoke(Method method, Object obj, Object... args) { + try { + return method.invoke(obj, args); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't invoke method " + method.getName(), e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't invoke method " + method.getName(), e); + } + } + + /** + * Invoke getConfiguration() on JobContext. Works with both + * Hadoop 1 and 2. + */ + public static Configuration getConfiguration(JobContext context) { + return (Configuration) invoke(GET_CONFIGURATION_METHOD, context); + } + + /** + * Invoke setStatus() on TaskAttemptContext. Works with both + * Hadoop 1 and 2. + */ + public static void setStatus(TaskAttemptContext context, String status) { + invoke(SET_STATUS_METHOD, context, status); + } + + /** + * returns TaskAttemptContext.getTaskAttemptID(). Works with both + * Hadoop 1 and 2. + */ + public static TaskAttemptID getTaskAttemptID(TaskAttemptContext taskContext) { + return (TaskAttemptID) invoke(GET_TASK_ATTEMPT_ID, taskContext); + } + + /** + * Invoke getCounter() on TaskInputOutputContext. Works with both + * Hadoop 1 and 2. + */ + public static Counter getCounter(TaskInputOutputContext context, + String groupName, String counterName) { + return (Counter) invoke(GET_COUNTER_METHOD, context, groupName, counterName); + } + + /** + * Invoke TaskAttemptContext.progress(). Works with both + * Hadoop 1 and 2. + */ + public static void progress(TaskAttemptContext context) { + invoke(PROGRESS_METHOD, context); + } + + /** + * Increment the counter. Works with both Hadoop 1 and 2 + */ + public static void incrementCounter(Counter counter, long increment) { + // incrementing a count might be called often. Might be affected by + // cost of invoke(). might be good candidate to handle in a shim. + // (TODO Raghu) figure out how achieve such a build with maven + invoke(INCREMENT_COUNTER_METHOD, counter, increment); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/Progressable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/Progressable.java b/src/java/org/apache/cassandra/hadoop/Progressable.java deleted file mode 100644 index ac253ef..0000000 --- a/src/java/org/apache/cassandra/hadoop/Progressable.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.apache.cassandra.hadoop; - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -import org.apache.hadoop.mapreduce.TaskAttemptContext; - - -public class Progressable -{ - private TaskAttemptContext context; - private org.apache.hadoop.util.Progressable progressable; - - public Progressable(TaskAttemptContext context) - { - this.context = context; - } - - public Progressable(org.apache.hadoop.util.Progressable progressable) - { - this.progressable = progressable; - } - - public void progress() - { - if (context != null) - context.progress(); - else - progressable.progress(); - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java index 3cc0cd1..7c89bef 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java @@ -25,7 +25,6 @@ import java.util.Map; import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat; import org.apache.cassandra.hadoop.ConfigHelper; -import org.apache.cassandra.hadoop.Progressable; import org.apache.hadoop.mapreduce.*; /** @@ -59,7 +58,7 @@ public class CqlOutputFormat extends AbstractColumnFamilyOutputFormat<Map<String @Deprecated public CqlRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException { - return new CqlRecordWriter(job, new Progressable(progress)); + return new CqlRecordWriter(job, progress); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java index 6f4478e..96f2f94 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; -import com.twitter.elephantbird.util.HadoopCompat; +import org.apache.cassandra.hadoop.HadoopCompat; import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat; import org.apache.cassandra.hadoop.ReporterWrapper; import org.apache.hadoop.mapred.InputSplit; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java index da278d8..cee4b4b 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java @@ -26,7 +26,7 @@ import java.util.*; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterables; -import com.twitter.elephantbird.util.HadoopCompat; +import org.apache.cassandra.hadoop.HadoopCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java index 690ef2e..9c2e156 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java @@ -23,7 +23,8 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import com.twitter.elephantbird.util.HadoopCompat; +import org.apache.cassandra.hadoop.HadoopCompat; +import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +38,6 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter; import org.apache.cassandra.hadoop.ConfigHelper; -import org.apache.cassandra.hadoop.Progressable; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -86,7 +86,7 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, CqlRecordWriter(TaskAttemptContext context) throws IOException { this(HadoopCompat.getConfiguration(context)); - this.progressable = new Progressable(context); + this.context = context; } CqlRecordWriter(Configuration conf, Progressable progressable) throws IOException @@ -181,7 +181,11 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, allValues.add(keyColumns.get(column)); client.put(allValues); - progressable.progress(); + + if (progressable != null) + progressable.progress(); + if (context != null) + HadoopCompat.progress(context); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 3c34167..1b51762 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -22,7 +22,7 @@ import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.*; -import com.twitter.elephantbird.util.HadoopCompat; +import org.apache.cassandra.hadoop.HadoopCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24923083/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java index a5156b9..284b72a 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java @@ -23,9 +23,8 @@ import java.nio.charset.CharacterCodingException; import java.util.*; -import com.twitter.elephantbird.util.HadoopCompat; +import org.apache.cassandra.hadoop.HadoopCompat; import org.apache.cassandra.cql3.CFDefinition; -import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.Column; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.exceptions.ConfigurationException;
