Author: gates
Date: Fri Jan 24 20:37:50 2014
New Revision: 1561158
URL: http://svn.apache.org/r1561158
Log:
HIVE-6248 HCatReader/Writer should hide Hadoop and Hive classes
Added:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/DataTransferFactory.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/DataTransferFactory.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/DataTransferFactory.java?rev=1561158&r1=1561157&r2=1561158&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/DataTransferFactory.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/DataTransferFactory.java
Fri Jan 24 20:37:50 2014
@@ -21,8 +21,6 @@ package org.apache.hive.hcatalog.data.tr
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hive.hcatalog.data.transfer.impl.HCatInputFormatReader;
import org.apache.hive.hcatalog.data.transfer.impl.HCatOutputFormatWriter;
import org.apache.hive.hcatalog.data.transfer.state.DefaultStateProvider;
@@ -56,16 +54,16 @@ public class DataTransferFactory {
* This should only be called once from every slave node to obtain an
instance
* of {@link HCatReader}.
*
- * @param split
- * input split obtained at master node
- * @param config
- * configuration obtained at master node
+ * @param context
+ * reader context obtained at the master node
+ * @param slaveNumber
+ * which slave this is, determines which part of the read is done
* @return {@link HCatReader}
*/
- public static HCatReader getHCatReader(final InputSplit split,
- final Configuration config) {
+ public static HCatReader getHCatReader(final ReaderContext context,
+ int slaveNumber) {
// In future, this may examine config to return appropriate HCatReader
- return getHCatReader(split, config, DefaultStateProvider.get());
+ return getHCatReader(context, slaveNumber, DefaultStateProvider.get());
}
/**
@@ -73,18 +71,19 @@ public class DataTransferFactory {
* of {@link HCatReader}. This should be called if an external system has
some
* state to provide to HCatalog.
*
- * @param split
- * input split obtained at master node
- * @param config
- * configuration obtained at master node
+ * @param context
+ * reader context obtained at the master node
+ * @param slaveNumber
+ * which slave this is, determines which part of the read is done
* @param sp
* {@link StateProvider}
* @return {@link HCatReader}
*/
- public static HCatReader getHCatReader(final InputSplit split,
- final Configuration config, StateProvider sp) {
+ public static HCatReader getHCatReader(final ReaderContext context,
+ int slaveNumber,
+ StateProvider sp) {
// In future, this may examine config to return appropriate HCatReader
- return new HCatInputFormatReader(split, config, sp);
+ return new HCatInputFormatReader(context, slaveNumber, sp);
}
/**
@@ -131,6 +130,6 @@ public class DataTransferFactory {
public static HCatWriter getHCatWriter(final WriterContext cntxt,
final StateProvider sp) {
// In future, this may examine context to return appropriate HCatWriter
- return new HCatOutputFormatWriter(cntxt.getConf(), sp);
+ return new HCatOutputFormatWriter(cntxt, sp);
}
}
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java?rev=1561158&r1=1561157&r2=1561158&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/HCatReader.java
Fri Jan 24 20:37:50 2014
@@ -95,11 +95,4 @@ public abstract class HCatReader {
this.conf = conf;
}
- public Configuration getConf() {
- if (null == conf) {
- throw new IllegalStateException(
- "HCatReader is not constructed correctly.");
- }
- return conf;
- }
}
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java?rev=1561158&r1=1561157&r2=1561158&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/ReaderContext.java
Fri Jan 24 20:37:50 2014
@@ -20,70 +20,22 @@
package org.apache.hive.hcatalog.data.transfer;
import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hive.hcatalog.mapreduce.HCatSplit;
/**
- * This class will contain information of different {@link InputSplit} obtained
- * at master node and configuration. This class implements
- * {@link Externalizable} so it can be serialized using standard java
- * mechanisms.
+ * This read context is obtained by the master node and should be distributed
+ * to the slaves. The contents of the class are opaque to the client. This
+ * interface extends {@link java.io.Externalizable} so that implementing
+ * classes can be serialized using standard Java mechanisms.
*/
-public class ReaderContext implements Externalizable, Configurable {
+public interface ReaderContext extends Externalizable {
- private static final long serialVersionUID = -2656468331739574367L;
- private List<InputSplit> splits;
- private Configuration conf;
+ /**
+ * Determine the number of splits available in this {@link ReaderContext}.
+ * The client is not required to have this many slave nodes,
+ * as one slave can be used to read multiple splits.
+ * @return number of splits
+ */
+ public int numSplits();
- public ReaderContext() {
- this.splits = new ArrayList<InputSplit>();
- this.conf = new Configuration();
- }
-
- public void setInputSplits(final List<InputSplit> splits) {
- this.splits = splits;
- }
-
- public List<InputSplit> getSplits() {
- return splits;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(final Configuration config) {
- conf = config;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- conf.write(out);
- out.writeInt(splits.size());
- for (InputSplit split : splits) {
- ((HCatSplit) split).write(out);
- }
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- conf.readFields(in);
- int numOfSplits = in.readInt();
- for (int i = 0; i < numOfSplits; i++) {
- HCatSplit split = new HCatSplit();
- split.readFields(in);
- splits.add(split);
- }
- }
}
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java?rev=1561158&r1=1561157&r2=1561158&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/WriterContext.java
Fri Jan 24 20:37:50 2014
@@ -20,46 +20,14 @@
package org.apache.hive.hcatalog.data.transfer;
import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
/**
- * This contains information obtained at master node to help prepare slave
nodes
- * for writer. This class implements {@link Externalizable} so it can be
- * serialized using standard java mechanisms. Master should serialize it and
+ * This contains information obtained at master node to be distributed to
+ * slaves nodes that will do the writing.
+ * This class implements {@link Externalizable} so it can be
+ * serialized using standard Java mechanisms. Master should serialize it and
* make it available to slaves to prepare for writes.
*/
-public class WriterContext implements Externalizable, Configurable {
-
- private static final long serialVersionUID = -5899374262971611840L;
- private Configuration conf;
-
- public WriterContext() {
- conf = new Configuration();
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(final Configuration config) {
- this.conf = config;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- conf.write(out);
- }
+public interface WriterContext extends Externalizable {
- @Override
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- conf.readFields(in);
- }
}
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java?rev=1561158&r1=1561157&r2=1561158&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatInputFormatReader.java
Fri Jan 24 20:37:50 2014
@@ -48,10 +48,10 @@ public class HCatInputFormatReader exten
private InputSplit split;
- public HCatInputFormatReader(InputSplit split, Configuration config,
- StateProvider sp) {
- super(config, sp);
- this.split = split;
+ public HCatInputFormatReader(ReaderContext context, int slaveNumber,
+ StateProvider sp) {
+ super(((ReaderContextImpl)context).getConf(), sp);
+ this.split = ((ReaderContextImpl)context).getSplits().get(slaveNumber);
}
public HCatInputFormatReader(ReadEntity info, Map<String, String> config) {
@@ -64,7 +64,7 @@ public class HCatInputFormatReader exten
Job job = new Job(conf);
HCatInputFormat hcif = HCatInputFormat.setInput(
job, re.getDbName(), re.getTableName(), re.getFilterString());
- ReaderContext cntxt = new ReaderContext();
+ ReaderContextImpl cntxt = new ReaderContextImpl();
cntxt.setInputSplits(hcif.getSplits(
ShimLoader.getHadoopShims().getHCatShim().createJobContext(job.getConfiguration(),
null)));
cntxt.setConf(job.getConfiguration());
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java?rev=1561158&r1=1561157&r2=1561158&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java
Fri Jan 24 20:37:50 2014
@@ -52,8 +52,8 @@ public class HCatOutputFormatWriter exte
super(we, config);
}
- public HCatOutputFormatWriter(Configuration config, StateProvider sp) {
- super(config, sp);
+ public HCatOutputFormatWriter(WriterContext cntxt, StateProvider sp) {
+ super(((WriterContextImpl)cntxt).getConf(), sp);
}
@Override
@@ -74,7 +74,7 @@ public class HCatOutputFormatWriter exte
} catch (InterruptedException e) {
throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
}
- WriterContext cntxt = new WriterContext();
+ WriterContextImpl cntxt = new WriterContextImpl();
cntxt.setConf(job.getConfiguration());
return cntxt;
}
@@ -124,10 +124,14 @@ public class HCatOutputFormatWriter exte
@Override
public void commit(WriterContext context) throws HCatException {
+ WriterContextImpl cntxtImpl = (WriterContextImpl)context;
try {
- new
HCatOutputFormat().getOutputCommitter(ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(
- context.getConf(),
ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID()))
-
.commitJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext(context.getConf(),
null));
+ new HCatOutputFormat().getOutputCommitter(
+ ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(
+ cntxtImpl.getConf(),
+ ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID()))
+
.commitJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext(
+ cntxtImpl.getConf(), null));
} catch (IOException e) {
throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
} catch (InterruptedException e) {
@@ -137,11 +141,14 @@ public class HCatOutputFormatWriter exte
@Override
public void abort(WriterContext context) throws HCatException {
+ WriterContextImpl cntxtImpl = (WriterContextImpl)context;
try {
- new
HCatOutputFormat().getOutputCommitter(ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(
- context.getConf(),
ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID()))
- .abortJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext(
- context.getConf(), null), State.FAILED);
+ new HCatOutputFormat().getOutputCommitter(
+ ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(
+ cntxtImpl.getConf(),
+ ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID()))
+ .abortJob(ShimLoader.getHadoopShims().getHCatShim().createJobContext(
+ cntxtImpl.getConf(), null), State.FAILED);
} catch (IOException e) {
throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e);
} catch (InterruptedException e) {
Added:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java?rev=1561158&view=auto
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java
(added)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/ReaderContextImpl.java
Fri Jan 24 20:37:50 2014
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.data.transfer.impl;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hive.hcatalog.data.transfer.ReaderContext;
+import org.apache.hive.hcatalog.mapreduce.HCatSplit;
+
+/**
+ * This class contains the list of {@link InputSplit}s obtained
+ * at master node and the configuration.
+ */
+class ReaderContextImpl implements ReaderContext, Configurable {
+
+ private static final long serialVersionUID = -2656468331739574367L;
+ private List<InputSplit> splits;
+ private Configuration conf;
+
+ public ReaderContextImpl() {
+ this.splits = new ArrayList<InputSplit>();
+ this.conf = new Configuration();
+ }
+
+ void setInputSplits(final List<InputSplit> splits) {
+ this.splits = splits;
+ }
+
+ List<InputSplit> getSplits() {
+ return splits;
+ }
+
+ @Override
+ public int numSplits() {
+ return splits.size();
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(final Configuration config) {
+ conf = config;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ conf.write(out);
+ out.writeInt(splits.size());
+ for (InputSplit split : splits) {
+ ((HCatSplit) split).write(out);
+ }
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ conf.readFields(in);
+ int numOfSplits = in.readInt();
+ for (int i = 0; i < numOfSplits; i++) {
+ HCatSplit split = new HCatSplit();
+ split.readFields(in);
+ splits.add(split);
+ }
+ }
+}
Added:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java?rev=1561158&view=auto
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java
(added)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/transfer/impl/WriterContextImpl.java
Fri Jan 24 20:37:50 2014
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.data.transfer.impl;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.hcatalog.data.transfer.WriterContext;
+
+/**
+ * This contains information obtained at master node to help prepare slave
nodes
+ * for writer. This class implements {@link Externalizable} so it can be
+ * serialized using standard java mechanisms. Master should serialize it and
+ * make it available to slaves to prepare for writes.
+ */
+class WriterContextImpl implements WriterContext, Configurable {
+
+ private static final long serialVersionUID = -5899374262971611840L;
+ private Configuration conf;
+
+ public WriterContextImpl() {
+ conf = new Configuration();
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(final Configuration config) {
+ this.conf = config;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ conf.write(out);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ conf.readFields(in);
+ }
+}
Modified:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java?rev=1561158&r1=1561157&r2=1561158&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java
(original)
+++
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/TestReaderWriter.java
Fri Jan 24 20:37:50 2014
@@ -95,8 +95,8 @@ public class TestReaderWriter extends HC
readCntxt = (ReaderContext) ois.readObject();
ois.close();
- for (InputSplit split : readCntxt.getSplits()) {
- runsInSlave(split, readCntxt.getConf());
+ for (int i = 0; i < readCntxt.numSplits(); i++) {
+ runsInSlave(readCntxt, i);
}
}
@@ -117,9 +117,9 @@ public class TestReaderWriter extends HC
return cntxt;
}
- private void runsInSlave(InputSplit split, Configuration config) throws
HCatException {
+ private void runsInSlave(ReaderContext cntxt, int slaveNum) throws
HCatException {
- HCatReader reader = DataTransferFactory.getHCatReader(split, config);
+ HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum);
Iterator<HCatRecord> itr = reader.read();
int i = 1;
while (itr.hasNext()) {