Author: edwardyoon
Date: Mon Mar 2 02:20:00 2015
New Revision: 1663195
URL: http://svn.apache.org/r1663195
Log:
HAMA-929: InstantiationException occurs when create a new instance of Combiner
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Mon
Mar 2 02:20:00 2015
@@ -30,7 +30,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
/**
* BSPMessageBundle stores a group of messages so that they can be sent in
batch
@@ -42,9 +41,6 @@ public class BSPMessageBundle<M extends
public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
- private BSPMessageCompressor<M> compressor = null;
- private long threshold = 128;
-
private String className = null;
private int bundleSize = 0;
@@ -55,19 +51,6 @@ public class BSPMessageBundle<M extends
bundleSize = 0;
}
- ByteArrayOutputStream mbos = new ByteArrayOutputStream();
- DataOutputStream mdos = new DataOutputStream(mbos);
- ByteArrayInputStream mbis = null;
- DataInputStream mdis = null;
-
- public byte[] serialize(M message) throws IOException {
- mbos.reset();
- message.write(mdos);
- return mbos.toByteArray();
- }
-
- private byte[] msgBytes;
-
/**
* Add message to this bundle.
*
@@ -79,16 +62,6 @@ public class BSPMessageBundle<M extends
className = message.getClass().getName();
}
- if (compressor != null) {
- msgBytes = serialize(message);
- if (msgBytes.length > threshold) {
- bufferDos.writeBoolean(true);
- msgBytes = compressor.compress(msgBytes);
- bufferDos.writeInt(msgBytes.length);
- } else {
- bufferDos.writeBoolean(false);
- }
- }
message.write(bufferDos);
bundleSize++;
} catch (IOException e) {
@@ -133,18 +106,7 @@ public class BSPMessageBundle<M extends
}
msg = ReflectionUtils.newInstance(clazz, null);
-
- if (compressor != null && dis.readBoolean()) {
- int length = dis.readInt();
- msgBytes = new byte[length];
- dis.readFully(msgBytes);
-
- mbis = new ByteArrayInputStream(compressor.decompress(msgBytes));
- mdis = new DataInputStream(mbis);
- msg.readFields(mdis);
- } else {
- msg.readFields(dis);
- }
+ msg.readFields(dis);
} catch (IOException ie) {
LOG.error(ie);
@@ -167,11 +129,6 @@ public class BSPMessageBundle<M extends
return bundleSize;
}
- public void setCompressor(BSPMessageCompressor<M> compressor, long
threshold) {
- this.compressor = compressor;
- this.threshold = threshold;
- }
-
/**
* @return the byte length of messages
* @throws IOException
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Mon Mar
2 02:20:00 2015
@@ -249,7 +249,7 @@ public final class BSPPeerImpl<K1, V1, K
try {
if (splitClass != null) {
inputSplit = (InputSplit) ReflectionUtils.newInstance(
- getConfiguration().getClassByName(splitClass), getConfiguration());
+ conf.getClassByName(splitClass), conf);
}
} catch (ClassNotFoundException exp) {
IOException wrap = new IOException("Split class " + splitClass
@@ -257,7 +257,7 @@ public final class BSPPeerImpl<K1, V1, K
wrap.initCause(exp);
throw wrap;
}
-
+
if (inputSplit != null) {
DataInputBuffer splitBuffer = new DataInputBuffer();
splitBuffer.reset(split.getBytes(), 0, split.getLength());
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon
Mar 2 02:20:00 2015
@@ -20,7 +20,6 @@ package org.apache.hama.bsp;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
@@ -354,17 +353,9 @@ public class LocalBSPRunner implements J
peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
bundle.getLength());
- if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
- bundle.setCompressor(compressor,
- conf.getLong(Constants.MESSENGER_COMPRESSION_THRESHOLD, 512));
- }
-
- Iterator<M> it = bundle.iterator();
- while (it.hasNext()) {
- MANAGER_MAP.get(addr).localQueueForNextIteration.add(it.next());
- peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
- 1L);
- }
+ MANAGER_MAP.get(addr).localQueueForNextIteration.addBundle(bundle);
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
+ bundle.size());
}
@Override
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Mon Mar 2 02:20:00 2015
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
-import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
@@ -95,7 +94,7 @@ public abstract class AbstractMessageMan
this.compressor = new BSPMessageCompressorFactory<M>().getCompressor(conf);
this.outgoingMessageManager = getOutgoingMessageManager();
- this.outgoingMessageManager.init(conf, compressor);
+ this.outgoingMessageManager.init(conf);
}
/*
@@ -256,13 +255,11 @@ public abstract class AbstractMessageMan
@Override
public void loopBackBundle(BSPMessageBundle<M> bundle) throws IOException {
- if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
- bundle.setCompressor(compressor,
- conf.getLong(Constants.MESSENGER_COMPRESSION_THRESHOLD, 128));
- }
-
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
bundle.size());
this.localQueueForNextIteration.addBundle(bundle);
+
+ // TODO checkpoint bundle itself instead of unpacked messages. --
edwardyoon
+ // notifyReceivedMessage(bundle);
}
@SuppressWarnings("unchecked")
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
Mon Mar 2 02:20:00 2015
@@ -21,17 +21,14 @@ import java.net.InetSocketAddress;
import java.util.HashMap;
import org.apache.hadoop.io.Writable;
-import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
import org.apache.hama.util.BSPNetUtils;
public abstract class AbstractOutgoingMessageManager<M extends Writable>
implements OutgoingMessageManager<M> {
protected HamaConfiguration conf;
- protected BSPMessageCompressor<M> compressor;
protected final HashMap<String, InetSocketAddress> peerSocketCache = new
HashMap<String, InetSocketAddress>();
protected HashMap<InetSocketAddress, BSPMessageBundle<M>> outgoingBundles =
new HashMap<InetSocketAddress, BSPMessageBundle<M>>();
@@ -48,10 +45,6 @@ public abstract class AbstractOutgoingMe
if (!outgoingBundles.containsKey(targetPeerAddress)) {
BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
- if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
- bundle.setCompressor(compressor,
- conf.getLong(Constants.MESSENGER_COMPRESSION_THRESHOLD, 128));
- }
outgoingBundles.put(targetPeerAddress, bundle);
}
return targetPeerAddress;
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
Mon Mar 2 02:20:00 2015
@@ -17,6 +17,10 @@
*/
package org.apache.hama.bsp.message;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
@@ -26,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
@@ -118,9 +123,20 @@ public final class HamaAsyncMessageManag
throw new IllegalArgumentException("Can not find " + addr.toString()
+ " to transfer messages to!");
} else {
- peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
- bundle.getLength());
- bspPeerConnection.put(bundle);
+ if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
+ ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
+ DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
+ bundle.write(bufferDos);
+
+ byte[] compressed = compressor.compress(byteBuffer.toByteArray());
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+ compressed.length);
+ bspPeerConnection.put(compressed);
+ } else {
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+ bundle.getLength());
+ bspPeerConnection.put(bundle);
+ }
}
}
@@ -155,6 +171,18 @@ public final class HamaAsyncMessageManag
loopBackBundle(bundle);
}
+ @Override
+ public void put(byte[] compressedBundle) throws IOException {
+ byte[] decompressed = compressor.decompress(compressedBundle);
+
+ BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+ ByteArrayInputStream bis = new ByteArrayInputStream(decompressed);
+ DataInputStream dis = new DataInputStream(bis);
+ bundle.readFields(dis);
+
+ loopBackBundle(bundle);
+ }
+
@Override
public final long getProtocolVersion(String arg0, long arg1)
throws IOException {
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java
Mon Mar 2 02:20:00 2015
@@ -45,5 +45,7 @@ public interface HamaMessageManager<M ex
* @param messages
*/
public void put(BSPMessageBundle<M> messages) throws IOException;
+
+ public void put(byte[] compressedBundle) throws IOException;
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
Mon Mar 2 02:20:00 2015
@@ -17,6 +17,10 @@
*/
package org.apache.hama.bsp.message;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
@@ -26,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
@@ -118,14 +123,27 @@ public final class HamaMessageManagerImp
throw new IllegalArgumentException("Can not find " + addr.toString()
+ " to transfer messages to!");
} else {
- peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
bundle.getLength());
- bspPeerConnection.put(bundle);
+
System.out.println(conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION,
false));
+
+ if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
+ ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
+ DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
+ bundle.write(bufferDos);
+
+ byte[] compressed = compressor.compress(byteBuffer.toByteArray());
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+ compressed.length);
+ bspPeerConnection.put(compressed);
+ } else {
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+ bundle.getLength());
+ bspPeerConnection.put(bundle);
+ }
}
}
/**
- * @param addr socket address to which BSP Peer Connection will be
- * established
+ * @param addr socket address to which BSP Peer Connection will be
established
* @return BSP Peer Connection, tried to return cached connection, else
* returns a new connection and caches it
* @throws IOException
@@ -155,6 +173,18 @@ public final class HamaMessageManagerImp
loopBackBundle(bundle);
}
+ @Override
+ public final void put(byte[] compressedBundle) throws IOException {
+ byte[] decompressed = compressor.decompress(compressedBundle);
+
+ BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+ ByteArrayInputStream bis = new ByteArrayInputStream(decompressed);
+ DataInputStream dis = new DataInputStream(bis);
+ bundle.readFields(dis);
+
+ loopBackBundle(bundle);
+ }
+
@Override
public final long getProtocolVersion(String arg0, long arg1)
throws IOException {
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
Mon Mar 2 02:20:00 2015
@@ -24,11 +24,10 @@ import java.util.Map.Entry;
import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
public interface OutgoingMessageManager<M extends Writable> {
- public void init(HamaConfiguration conf, BSPMessageCompressor<M> compressor);
+ public void init(HamaConfiguration conf);
public void addMessage(String peerName, M msg);
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
Mon Mar 2 02:20:00 2015
@@ -22,12 +22,11 @@ import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.Combiner;
-import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
+import org.apache.hama.util.ReflectionUtils;
public class OutgoingPOJOMessageBundle<M extends Writable> extends
AbstractOutgoingMessageManager<M> {
@@ -36,14 +35,16 @@ public class OutgoingPOJOMessageBundle<M
@SuppressWarnings("unchecked")
@Override
- public void init(HamaConfiguration conf, BSPMessageCompressor<M> compressor)
{
+ public void init(HamaConfiguration conf) {
this.conf = conf;
- this.compressor = compressor;
final String combinerName = conf.get(Constants.COMBINER_CLASS);
if (combinerName != null) {
- this.combiner = (Combiner<M>) ReflectionUtils.newInstance(
- conf.getClass(combinerName, Combiner.class), conf);
+ try {
+ this.combiner = (Combiner<M>)
ReflectionUtils.newInstance(combinerName);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
}
}
@@ -55,8 +56,6 @@ public class OutgoingPOJOMessageBundle<M
BSPMessageBundle<M> bundle = outgoingBundles.get(targetPeerAddress);
bundle.addMessage(msg);
BSPMessageBundle<M> combined = new BSPMessageBundle<M>();
- combined.setCompressor(compressor,
- conf.getLong("hama.messenger.compression.threshold", 128));
combined.addMessage(combiner.combine(bundle));
outgoingBundles.put(targetPeerAddress, combined);
} else {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
Mon Mar 2 02:20:00 2015
@@ -31,7 +31,7 @@ import org.apache.hama.HamaConfiguration
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.message.AbstractOutgoingMessageManager;
-import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
+import org.apache.hama.util.ReflectionUtils;
public class OutgoingVertexMessageManager<M extends Writable> extends
AbstractOutgoingMessageManager<GraphJobMessage> {
@@ -44,17 +44,16 @@ public class OutgoingVertexMessageManage
@SuppressWarnings("unchecked")
@Override
- public void init(HamaConfiguration conf,
- BSPMessageCompressor<GraphJobMessage> compressor) {
+ public void init(HamaConfiguration conf) {
this.conf = conf;
- this.compressor = compressor;
- if (!conf.getClass(Constants.COMBINER_CLASS, Combiner.class).equals(
- Combiner.class)) {
-
- combiner = (Combiner<Writable>) org.apache.hadoop.util.ReflectionUtils
- .newInstance(conf.getClass(Constants.COMBINER_CLASS, Combiner.class),
- conf);
+ final String combinerName = conf.get(Constants.COMBINER_CLASS);
+ if (combinerName != null) {
+ try {
+ combiner = (Combiner<Writable>)
ReflectionUtils.newInstance(combinerName);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
}
}