Author: gates
Date: Tue Feb 7 05:49:08 2012
New Revision: 1241353
URL: http://svn.apache.org/viewvc?rev=1241353&view=rev
Log:
HCATALOG-208. mapred HCatInputFormat/HCatOutputFomat changes to make it work
from hive
Added:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapred/
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1241353&r1=1241352&r2=1241353&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Feb 7 05:49:08 2012
@@ -35,6 +35,8 @@ Release 0.3.0 - Unreleased
INCOMPATIBLE CHANGES
NEW FEATURES
+ HCAT-208. mapred HCatInputFormat/HCatOutputFomat changes to make it work
from hive (khorgath via gates)
+
HCAT-207. Changes to current HCat subsystem to allow it to work with hive
(khorgath via gates)
HCAT-204. HCatRecord SerDe (khorgath via gates)
Added:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java?rev=1241353&view=auto
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java
(added)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java
Tue Feb 7 05:49:08 2012
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.Pair;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.InitializeInput;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+import org.apache.hcatalog.storagehandler.HCatStorageHandlerImpl;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+
+
+public class HCatMapredInputFormat implements InputFormat {
+
+
+ private static final Log LOG =
LogFactory.getLog(HCatMapredInputFormat.class);
+ HCatInputFormat hci;
+
+ public HCatMapredInputFormat(){
+ hci = new HCatInputFormat();
+ }
+
+ @Override
+ public RecordReader getRecordReader(InputSplit split, JobConf job,
+ Reporter arg2) throws IOException {
+ try {
+ org.apache.hadoop.mapreduce.RecordReader<WritableComparable, HCatRecord>
rr;
+ TaskAttemptContext taContext
+ = HCatHadoopShims.Instance.get().createTaskAttemptContext(job, new
TaskAttemptID());
+ rr =
hci.createRecordReader(((HiveHCatSplitWrapper)split).getHCatSplit(), taContext);
+ rr.initialize(((HiveHCatSplitWrapper)split).getHCatSplit(),taContext);
+ return (RecordReader) rr;
+
+ } catch (java.lang.InterruptedException e){
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int arg1) throws IOException {
+
+ try {
+ List<InputSplit> hsplits = new ArrayList<InputSplit>();
+ for (org.apache.hadoop.mapreduce.InputSplit hs : hci.getSplits(
+ HCatHadoopShims.Instance.get().createJobContext(job, new JobID()))){
+ HiveHCatSplitWrapper hwrapper = new
HiveHCatSplitWrapper((HCatSplit)hs);
+
+ String hwrapperPath = hwrapper.getPath().toString();
+ String mapredInputDir = job.get("mapred.input.dir","null");
+
+ if(hwrapperPath.startsWith(mapredInputDir)){
+ hsplits.add(hwrapper);
+ }
+ }
+ InputSplit[] splits = new InputSplit[hsplits.size()];
+ for (int i = 0 ; i <hsplits.size(); i++){
+ splits[i] = hsplits.get(i);
+ }
+ return splits;
+ } catch (java.lang.InterruptedException e){
+ throw new IOException(e);
+ }
+ }
+
+ public static void setTableDesc(TableDesc tableDesc, Map<String,String>
jobProperties) throws IOException{
+ try {
+ Pair<String,String> dbAndTableName =
HCatUtil.getDbAndTableName(tableDesc.getTableName());
+ InputJobInfo info = InputJobInfo.create(dbAndTableName.first,
dbAndTableName.second, "", null, null);
+ jobProperties.put(HCatConstants.HCAT_KEY_JOB_INFO
+ ,InitializeInput.getSerializedHcatKeyJobInfo(
+ null, info,tableDesc.getProperties().getProperty("location")));
+ } catch (Exception e){
+ throw new IOException(e);
+ }
+ }
+
+}
Added:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java?rev=1241353&view=auto
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java
(added)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java
Tue Feb 7 05:49:08 2012
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.Pair;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils.CollectionBuilder;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.InitializeInput;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+import org.apache.hcatalog.storagehandler.HCatStorageHandlerImpl;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+public class HCatMapredOutputFormat implements OutputFormat, HiveOutputFormat {
+
+ HCatOutputFormat hco;
+ private static final Log LOG =
LogFactory.getLog(HCatMapredOutputFormat.class);
+
+ public HCatMapredOutputFormat() {
+ LOG.debug("HCatMapredOutputFormat init");
+ hco = new HCatOutputFormat();
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem arg0, JobConf arg1)
+ throws IOException {
+ LOG.debug("HCatMapredOutputFormat checkOutputSpecs");
+ JobContext context = HCatHadoopShims.Instance.get().createJobContext(arg1,
new JobID());
+ try {
+ hco.checkOutputSpecs(context);
+ } catch (InterruptedException e) {
+ LOG.warn(e.getMessage());
+ HCatUtil.logStackTrace(LOG);
+ }
+ }
+
+ @Override
+ public RecordWriter getRecordWriter(FileSystem arg0, JobConf arg1,
+ String arg2, Progressable arg3) throws IOException {
+ // this is never really called from hive, but it's part of the IF interface
+
+ LOG.debug("HCatMapredOutputFormat getRecordWriter");
+ return getRW(arg1);
+ }
+
+ public HCatMapredRecordWriter getRW(Configuration arg1) throws IOException {
+ try {
+ JobContext jc = HCatHadoopShims.Instance.get().createJobContext(arg1,
new JobID());
+ TaskAttemptContext taContext =
HCatHadoopShims.Instance.get().createTaskAttemptContext(arg1, new
TaskAttemptID());
+ return new HCatMapredOutputFormat.HCatMapredRecordWriter(hco,jc,taContext);
+ } catch (Exception e){
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter
getHiveRecordWriter(
+ JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed,
+ Properties tableProperties, Progressable progress) throws IOException {
+ LOG.debug("HCatMapredOutputFormat getHiveRecordWriter");
+ final HCatMapredRecordWriter rw = getRW(jc);
+ return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
+ public void write(Writable r) throws IOException {
+ rw.write(null, (HCatRecord) r);
+ }
+ public void close(boolean abort) throws IOException {
+ rw.setAbortStatus(abort);
+ rw.close(null);
+ }
+ };
+
+ }
+
+ public static void setTableDesc(TableDesc tableDesc, Map<String,String>
jobProperties) throws IOException {
+ setTableDesc(tableDesc,jobProperties,new LinkedHashMap<String, String>());
+ }
+
+ public static void setPartitionDesc(PartitionDesc ptnDesc,
Map<String,String> jobProperties) throws IOException {
+ setTableDesc(ptnDesc.getTableDesc(),jobProperties,ptnDesc.getPartSpec());
+ }
+
+ public static void setTableDesc(TableDesc tableDesc, Map<String,String>
jobProperties, Map<String,String> ptnValues) throws IOException {
+ Pair<String,String> dbAndTableName =
HCatUtil.getDbAndTableName(tableDesc.getTableName());
+
+ OutputJobInfo outputJobInfo = OutputJobInfo.create(
+ dbAndTableName.first, dbAndTableName.second,
+ ptnValues, null, null);
+
+ Job job = new Job(new Configuration());
+ // TODO : verify with thw if this needs to be shim-ed. There exists no
current Shim
+ // for instantiating a Job, and we use it only temporarily.
+
+ HCatOutputFormat.setOutput(job, outputJobInfo);
+ LOG.debug("HCatOutputFormat.setOutput() done");
+
+ // Now we need to set the schema we intend to write
+
+ Properties tprops = tableDesc.getProperties();
+ String columnNameProperty = tprops.getProperty(Constants.LIST_COLUMNS);
+ String columnTypeProperty =
tprops.getProperty(Constants.LIST_COLUMN_TYPES);
+
+ List<String> columnNames;
+ // all table column names
+ if (columnNameProperty.length() == 0) {
+ columnNames = new ArrayList<String>();
+ } else {
+ columnNames = Arrays.asList(columnNameProperty.split(","));
+ }
+
+ List<TypeInfo> columnTypes;
+ // all column types
+ if (columnTypeProperty.length() == 0) {
+ columnTypes = new ArrayList<TypeInfo>();
+ } else {
+ columnTypes =
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+ }
+
+ StructTypeInfo rowTypeInfo = (StructTypeInfo)
TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+ HCatSchema hsch =
HCatSchemaUtils.getHCatSchema(rowTypeInfo).getFields().get(0).getStructSubSchema();
+ // getting inner schema, because it's the difference between
struct<i:int,j:int> and i:int,j:int.
+ // and that's what we need to provide to HCatOutputFormat
+
+ LOG.debug("schema "+hsch.toString());
+ HCatOutputFormat.setSchema(job, hsch);
+
+ for (String confToSave : HCatConstants.OUTPUT_CONFS_TO_SAVE){
+ String confVal = job.getConfiguration().get(confToSave);
+ if (confVal != null){
+ jobProperties.put(confToSave, confVal);
+ }
+ }
+
+ }
+
+ public class HCatMapredRecordWriter implements
org.apache.hadoop.mapred.RecordWriter<WritableComparable<?>, HCatRecord>{
+
+ org.apache.hadoop.mapreduce.RecordWriter writer;
+ org.apache.hadoop.mapreduce.OutputCommitter outputCommitter;
+ TaskAttemptContext taContext;
+ JobContext jc;
+ boolean jobIsSetup = false;
+ boolean wroteData = false;
+ boolean aborted = false;
+
+ public HCatMapredRecordWriter(
+ HCatOutputFormat hco, JobContext jc,
+ TaskAttemptContext taContext) throws IOException{
+ this.taContext = taContext;
+ try {
+ this.outputCommitter = hco.getOutputCommitter(taContext);
+ this.writer = hco.getRecordWriter(taContext);
+ } catch (java.lang.InterruptedException e){
+ throw new IOException(e);
+ }
+ this.wroteData = false;
+ this.aborted = false;
+ }
+
+ public void setAbortStatus(boolean abort) {
+ this.aborted = abort;
+ }
+
+ @Override
+ public void close(Reporter arg0) throws IOException {
+ try {
+ writer.close(taContext);
+ if (outputCommitter.needsTaskCommit(taContext)){
+ outputCommitter.commitTask(taContext);
+ }
+ if (this.wroteData && this.jobIsSetup){
+ if (!this.aborted){
+ outputCommitter.commitJob(taContext);
+ } else {
+ outputCommitter.cleanupJob(taContext);
+ }
+ }
+ } catch (Exception e){
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void write(WritableComparable arg0, HCatRecord arg1) throws
IOException {
+ try {
+ if (!jobIsSetup){
+ this.outputCommitter.setupJob(taContext);
+ jobIsSetup = true;
+ }
+ writer.write(arg0, arg1);
+ this.wroteData = true;
+ } catch (Exception e){
+ throw new IOException(e);
+ }
+ }
+
+ }
+}
Added:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java?rev=1241353&view=auto
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java
(added)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java
Tue Feb 7 05:49:08 2012
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+
+/**
+ * Even though HiveInputSplit expects an InputSplit to wrap, it
+ * expects getPath() to work from the underlying split. And since
+ * that's populated by HiveInputSplit only if the underlying
+ * split is a FileSplit, the HCatSplit that goes to Hive needs
+ * to be a FileSplit. And since FileSplit is a class, and
+ * mapreduce.InputSplit is also a class, we can't do the trick
+ * where we implement mapred.inputSplit and extend mapred.InputSplit.
+ *
+ * Thus, we compose the other HCatSplit, and work with it.
+ *
+ * Also, this means that reading HCat through Hive will only work
+ * when the underlying InputFormat's InputSplit has implemented
+ * a getPath() - either by subclassing FileSplit, or by itself -
+ * we make a best effort attempt to call a getPath() via reflection,
+ * but if that doesn't work, this isn't going to work.
+ *
+ */
+public class HiveHCatSplitWrapper extends FileSplit implements InputSplit {
+
+ Log LOG = LogFactory.getLog(HiveHCatSplitWrapper.class);
+
+ HCatSplit hsplit;
+
+ public HiveHCatSplitWrapper() {
+ super((Path) null, 0, 0, (String[]) null);
+ }
+
+ public HiveHCatSplitWrapper(HCatSplit hsplit) {
+ this();
+ this.hsplit = hsplit;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ hsplit = new HCatSplit();
+ hsplit.readFields(input);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ hsplit.write(output);
+ }
+
+ @Override
+ public long getLength() {
+ return hsplit.getLength();
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return hsplit.getLocations();
+ }
+
+ @Override
+ public Path getPath() {
+ /**
+ * This function is the reason this class exists at all.
+ * See class description for why.
+ */
+ if (hsplit.getBaseSplit() instanceof FileSplit){
+ // if baseSplit is a FileSplit, then return that.
+ return ((FileSplit)hsplit.getBaseSplit()).getPath();
+ } else {
+ // use reflection to try and determine if underlying class has a
getPath() method that returns a path
+ Class<?> c = hsplit.getBaseSplit().getClass();
+ try {
+ return (Path) (c.getMethod("getPath")).invoke(hsplit.getBaseSplit());
+ } catch (Exception e) {
+ HCatUtil.logStackTrace(LOG);
+ // not much we can do - default exit will return null Path
+ }
+
+ }
+ LOG.error("Returning empty path from getPath(), Hive will not be happy.");
+ return new Path(""); // This will cause hive to error, but we can't do
anything for that situation.
+ }
+
+ public HCatSplit getHCatSplit() {
+ return hsplit;
+ }
+
+}
Added:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java?rev=1241353&view=auto
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java
(added)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java
Tue Feb 7 05:49:08 2012
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.storagehandler;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
+import
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.ql.security.authorization.Privilege;
+
+/**
+ * This class is a dummy implementation of HiveAuthorizationProvider to provide
+ * dummy authorization functionality for other classes to extend and override.
+ */
+class DummyHCatAuthProvider implements HiveAuthorizationProvider {
+
+ @Override
+ public Configuration getConf() {
+ return null;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ *
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider
+ * #init(org.apache.hadoop.conf.Configuration)
+ */
+ @Override
+ public void init(Configuration conf) throws HiveException {
+ }
+
+ @Override
+ public HiveAuthenticationProvider getAuthenticator() {
+ return null;
+ }
+
+ @Override
+ public void setAuthenticator(HiveAuthenticationProvider authenticator) {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ *
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider
+ * #authorize(org.apache.hadoop.hive.ql.security.authorization.Privilege[],
+ * org.apache.hadoop.hive.ql.security.authorization.Privilege[])
+ */
+ @Override
+ public void authorize(Privilege[] readRequiredPriv,
+ Privilege[] writeRequiredPriv) throws HiveException,
+ AuthorizationException {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ *
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider
+ * #authorize(org.apache.hadoop.hive.metastore.api.Database,
+ * org.apache.hadoop.hive.ql.security.authorization.Privilege[],
+ * org.apache.hadoop.hive.ql.security.authorization.Privilege[])
+ */
+ @Override
+ public void authorize(Database db, Privilege[] readRequiredPriv,
+ Privilege[] writeRequiredPriv) throws HiveException,
+ AuthorizationException {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ *
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider
+ * #authorize(org.apache.hadoop.hive.ql.metadata.Table,
+ * org.apache.hadoop.hive.ql.security.authorization.Privilege[],
+ * org.apache.hadoop.hive.ql.security.authorization.Privilege[])
+ */
+ @Override
+ public void authorize(Table table, Privilege[] readRequiredPriv,
+ Privilege[] writeRequiredPriv) throws HiveException,
+ AuthorizationException {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ *
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider
+ * #authorize(org.apache.hadoop.hive.ql.metadata.Partition,
+ * org.apache.hadoop.hive.ql.security.authorization.Privilege[],
+ * org.apache.hadoop.hive.ql.security.authorization.Privilege[])
+ */
+ @Override
+ public void authorize(Partition part, Privilege[] readRequiredPriv,
+ Privilege[] writeRequiredPriv) throws HiveException,
+ AuthorizationException {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ *
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider
+ * #authorize(org.apache.hadoop.hive.ql.metadata.Table,
+ * org.apache.hadoop.hive.ql.metadata.Partition, java.util.List,
+ * org.apache.hadoop.hive.ql.security.authorization.Privilege[],
+ * org.apache.hadoop.hive.ql.security.authorization.Privilege[])
+ */
+ @Override
+ public void authorize(Table table, Partition part, List<String> columns,
+ Privilege[] readRequiredPriv, Privilege[] writeRequiredPriv)
+ throws HiveException, AuthorizationException {
+ }
+
+}
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java?rev=1241353&r1=1241352&r2=1241353&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java
Tue Feb 7 05:49:08 2012
@@ -186,7 +186,7 @@ public abstract class HCatStorageHandler
* ()
*/
@Override
- public final Class<? extends InputFormat> getInputFormatClass() {
+ public Class<? extends InputFormat> getInputFormatClass() {
return DummyInputFormat.class;
}
@@ -198,7 +198,7 @@ public abstract class HCatStorageHandler
* ()
*/
@Override
- public final Class<? extends OutputFormat> getOutputFormatClass() {
+ public Class<? extends OutputFormat> getOutputFormatClass() {
return DummyOutputFormat.class;
}
Added:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java?rev=1241353&view=auto
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java
(added)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java
Tue Feb 7 05:49:08 2012
@@ -0,0 +1,163 @@
+package org.apache.hcatalog.storagehandler;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecordSerDe;
+import org.apache.hcatalog.mapred.HCatMapredInputFormat;
+import org.apache.hcatalog.mapred.HCatMapredOutputFormat;
+import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
+import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.storagehandler.HCatStorageHandler.DummyInputFormat;
+import org.apache.hcatalog.storagehandler.HCatStorageHandler.DummyOutputFormat;
+
+public class HCatStorageHandlerImpl extends HCatStorageHandler {
+
+ Class isd;
+ Class osd;
+
+ Log LOG = LogFactory.getLog(HCatStorageHandlerImpl.class);
+
+ @Override
+ public Class<? extends HCatInputStorageDriver> getInputStorageDriver() {
+ return isd;
+ }
+
+ @Override
+ public Class<? extends HCatOutputStorageDriver> getOutputStorageDriver() {
+ return osd;
+ }
+
+ @Override
+ public HiveAuthorizationProvider getAuthorizationProvider()
+ throws HiveException {
+ return new DummyHCatAuthProvider();
+ }
+
+ @Override
+ public void commitCreateTable(Table table) throws MetaException {
+ }
+
+ @Override
+ public void commitDropTable(Table table, boolean deleteData)
+ throws MetaException {
+ // do nothing special
+ }
+
+ @Override
+ public void preCreateTable(Table table) throws MetaException {
+ // do nothing special
+ }
+
+ @Override
+ public void preDropTable(Table table) throws MetaException {
+ // do nothing special
+ }
+
+ @Override
+ public void rollbackCreateTable(Table table) throws MetaException {
+ // do nothing special
+ }
+
+ @Override
+ public void rollbackDropTable(Table table) throws MetaException {
+ // do nothing special
+ }
+
+ @Override
+ public HiveMetaHook getMetaHook() {
+ return this;
+ }
+
+ @Override
+ public void configureTableJobProperties(TableDesc tableDesc,
+ Map<String, String> jobProperties) {
+ // Information about the table and the job to be performed
+ // We pass them on into the mepredif / mapredof
+
+ Properties tprops = tableDesc.getProperties();
+
+ if(LOG.isDebugEnabled()){
+ LOG.debug("HCatStorageHandlerImpl configureTableJobProperties:");
+ HCatUtil.logStackTrace(LOG);
+ HCatUtil.logMap(LOG, "jobProperties", jobProperties);
+ if (tprops!= null){
+ HCatUtil.logEntrySet(LOG, "tableprops", tprops.entrySet());
+ }
+ LOG.debug("tablename : "+tableDesc.getTableName());
+ }
+
+ // copy existing table props first
+ for (Entry e : tprops.entrySet()){
+ jobProperties.put((String)e.getKey(), (String)e.getValue());
+ }
+
+ // try to set input format related properties
+ try {
+ HCatMapredInputFormat.setTableDesc(tableDesc,jobProperties);
+ } catch (IOException ioe){
+ // ok, things are probably not going to work, but we
+ // can't throw out exceptions per interface. So, we log.
+ LOG.error("HCatInputFormat init fail " + ioe.getMessage());
+ LOG.error(ioe.getStackTrace());
+ }
+
+ // try to set output format related properties
+ try {
+ HCatMapredOutputFormat.setTableDesc(tableDesc,jobProperties);
+ } catch (IOException ioe){
+ // ok, things are probably not going to work, but we
+ // can't throw out exceptions per interface. So, we log.
+ LOG.error("HCatOutputFormat init fail " + ioe.getMessage());
+ LOG.error(ioe.getStackTrace());
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return null;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ }
+
+ @Override
+ public Class<? extends SerDe> getSerDeClass() {
+ return HCatRecordSerDe.class;
+ }
+
+ @Override
+ public final Class<? extends InputFormat> getInputFormatClass() {
+ return HCatMapredInputFormat.class;
+ }
+
+ @Override
+ public final Class<? extends OutputFormat> getOutputFormatClass() {
+ return HCatMapredOutputFormat.class;
+ }
+
+}
Added:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java?rev=1241353&view=auto
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java
(added)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java
Tue Feb 7 05:49:08 2012
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.data;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+
+/**
+ * Helper class for Other Data Testers
+ */
+public class HCatDataCheckUtil {
+
+ public static Driver instantiateDriver(MiniCluster cluster) {
+ HiveConf hiveConf = new HiveConf(HCatDataCheckUtil.class);
+ for (Entry e : cluster.getProperties().entrySet()){
+ hiveConf.set(e.getKey().toString(), e.getValue().toString());
+ }
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+
+ Log logger = LogFactory.getLog(HCatOutputFormat.class);
+ HCatUtil.logHiveConf(logger , hiveConf);
+
+ Driver driver = new Driver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+ return driver;
+ }
+
+ public static void generateDataFile(MiniCluster cluster, String fileName)
throws IOException {
+ MiniCluster.deleteFile(cluster, fileName);
+ String[] input = new String[50];
+ for(int i = 0; i < 50; i++) {
+ input[i] = (i % 5) + "\t" + i + "\t" + "_S" + i + "S_";
+ }
+ MiniCluster.createInputFile(cluster, fileName, input);
+ }
+
+ public static void createTable(Driver driver, String tableName, String
createTableArgs)
+ throws CommandNeedRetryException, IOException {
+ String createTable = "create table " + tableName + createTableArgs;
+ int retCode = driver.run(createTable).getResponseCode();
+ if(retCode != 0) {
+ throw new IOException("Failed to create table. ["+createTable+"], return
code from hive driver : ["+retCode+"]");
+ }
+ }
+
+ public static void dropTable(Driver driver, String tablename) throws
IOException, CommandNeedRetryException{
+ driver.run("drop table if exists "+tablename);
+ }
+
+ public static ArrayList<String> formattedRun(Driver driver, String name,
String selectCmd)
+ throws CommandNeedRetryException, IOException {
+ driver.run(selectCmd);
+ ArrayList<String> src_values = new ArrayList<String>();
+ driver.getResults(src_values);
+ for (String s : src_values){
+ System.out.println(name+":"+s);
+ }
+ return src_values;
+ }
+
+}
Added:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java?rev=1241353&view=auto
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java
(added)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java
Tue Feb 7 05:49:08 2012
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.data.HCatDataCheckUtil;
+import org.apache.hcatalog.mapred.HCatMapredInputFormat;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.storagehandler.HCatStorageHandlerImpl;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.util.UDFContext;
+
+public class TestHiveHCatInputFormat extends TestCase {
+ private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static Driver driver;
+
+ String PTNED_TABLE = "junit_testhiveinputintegration_ptni";
+ String UNPTNED_TABLE = "junit_testhiveinputintegration_noptn";
+ String basicFile = "/tmp/"+PTNED_TABLE+".file";
+
+ public void testFromHive() throws Exception {
+ if (driver == null){
+ driver = HCatDataCheckUtil.instantiateDriver(cluster);
+ }
+
+ Properties props = new Properties();
+ props.setProperty("fs.default.name",
cluster.getProperties().getProperty("fs.default.name"));
+ String basicFileFullName =
cluster.getProperties().getProperty("fs.default.name") + basicFile;
+
+ cleanup();
+
+ // create source data file
+ HCatDataCheckUtil.generateDataFile(cluster,basicFile);
+
+ String createPtnedTable = "(j int, s string) partitioned by (i int) "
+ +"stored by '"+HCatStorageHandlerImpl.class.getName()+"' tblproperties"
+ + "('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver',"
+ + "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+
+ HCatDataCheckUtil.createTable(driver,PTNED_TABLE,createPtnedTable);
+
+ String createUnptnedTable = "(i int, j int, s string) "
+ +"stored by '"+HCatStorageHandlerImpl.class.getName()+"' tblproperties"
+ + "('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver',"
+ + "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+
+ HCatDataCheckUtil.createTable(driver,UNPTNED_TABLE,createUnptnedTable);
+
+
+ driver.run("describe extended "+UNPTNED_TABLE);
+ ArrayList<String> des_values = new ArrayList<String>();
+ driver.getResults(des_values);
+ for (String s : des_values){
+ System.err.println("du:"+s);
+ }
+
+ driver.run("describe extended "+PTNED_TABLE);
+ ArrayList<String> des2_values = new ArrayList<String>();
+ driver.getResults(des2_values);
+ for (String s : des2_values){
+ System.err.println("dp:"+s);
+ }
+
+ // use pig to read from source file and put into this table
+
+ PigServer server = new PigServer(ExecType.LOCAL, props);
+ UDFContext.getUDFContext().setClientSystemProps();
+ server.setBatchOn();
+ server.registerQuery("A = load '"+basicFileFullName+"' as (i:int, j:int,
s:chararray);");
+ server.registerQuery("store A into '"+UNPTNED_TABLE+"' using
org.apache.hcatalog.pig.HCatStorer();");
+ server.executeBatch();
+
+ server.setBatchOn();
+ server.registerQuery("A = load '"+basicFileFullName+"' as (i:int, j:int,
s:chararray);");
+ server.registerQuery("store A into '"+PTNED_TABLE+"' using
org.apache.hcatalog.pig.HCatStorer();");
+ server.executeBatch();
+
+ // partitioned by i
+ // select * from tbl;
+ // select j,s,i from tbl;
+ // select * from tbl where i = 3;
+ // select j,s,i from tbl where i = 3;
+ // select * from tbl where j = 3;
+ // select j,s,i from tbl where j = 3;
+
+ ArrayList<String> p_select_star_nofilter =
HCatDataCheckUtil.formattedRun(driver,
+ "p_select_star_nofilter","select * from "+PTNED_TABLE);
+ ArrayList<String> p_select_named_nofilter =
HCatDataCheckUtil.formattedRun(driver,
+ "p_select_named_nofilter","select j,s,i from "+PTNED_TABLE);
+
+ assertDataIdentical(p_select_star_nofilter,p_select_named_nofilter,50);
+
+ ArrayList<String> p_select_star_ptnfilter =
HCatDataCheckUtil.formattedRun(driver,
+ "p_select_star_ptnfilter","select * from "+PTNED_TABLE+" where i = 3");
+ ArrayList<String> p_select_named_ptnfilter =
HCatDataCheckUtil.formattedRun(driver,
+ "p_select_named_ptnfilter","select j,s,i from "+PTNED_TABLE+" where i
= 3");
+
+ assertDataIdentical(p_select_star_ptnfilter,p_select_named_ptnfilter,10);
+
+ ArrayList<String> select_star_nonptnfilter =
HCatDataCheckUtil.formattedRun(driver,
+ "select_star_nonptnfilter","select * from "+PTNED_TABLE+" where j =
28");
+ ArrayList<String> select_named_nonptnfilter =
HCatDataCheckUtil.formattedRun(driver,
+ "select_named_nonptnfilter","select j,s,i from "+PTNED_TABLE+" where j
= 28");
+
+ assertDataIdentical(select_star_nonptnfilter,select_named_nonptnfilter,1);
+
+ // non-partitioned
+ // select * from tbl;
+ // select i,j,s from tbl;
+ // select * from tbl where i = 3;
+ // select i,j,s from tbl where i = 3;
+
+ // select j,s,i from tbl;
+ // select j,s,i from tbl where i = 3;
+
+ ArrayList<String> select_star_nofilter =
HCatDataCheckUtil.formattedRun(driver,
+ "select_star_nofilter","select * from "+UNPTNED_TABLE); //i,j,s select
* order is diff for unptn
+ ArrayList<String> select_ijs_nofilter =
HCatDataCheckUtil.formattedRun(driver,
+ "select_ijs_nofilter","select i,j,s from "+UNPTNED_TABLE);
+
+ assertDataIdentical(select_star_nofilter,select_ijs_nofilter,50);
+
+ ArrayList<String> select_star_ptnfilter =
HCatDataCheckUtil.formattedRun(driver,
+ "select_star_ptnfilter","select * from "+UNPTNED_TABLE+" where i =
3"); //i,j,s
+ ArrayList<String> select_ijs_ptnfilter =
HCatDataCheckUtil.formattedRun(driver,
+ "select_ijs_ptnfilter","select i,j,s from "+UNPTNED_TABLE+" where i =
3");
+
+ assertDataIdentical(select_star_ptnfilter,select_ijs_ptnfilter,10);
+
+ ArrayList<String> select_jsi_nofilter =
HCatDataCheckUtil.formattedRun(driver,
+ "select_jsi_nofilter","select j,s,i from "+UNPTNED_TABLE);
+ assertDataIdentical(p_select_named_nofilter,select_jsi_nofilter,50,true);
+
+ ArrayList<String> select_jsi_ptnfilter =
HCatDataCheckUtil.formattedRun(driver,
+ "select_jsi_ptnfilter","select j,s,i from "+UNPTNED_TABLE+" where i =
3");
+ assertDataIdentical(p_select_named_ptnfilter,select_jsi_ptnfilter,10,true);
+
+ }
+
+ private void assertDataIdentical(ArrayList<String> result1,
+ ArrayList<String> result2, int numRecords) {
+ assertDataIdentical(result1,result2,numRecords,false);
+ }
+
+ private void assertDataIdentical(ArrayList<String> result1,
+ ArrayList<String> result2, int numRecords,boolean doSort) {
+ assertEquals(numRecords, result1.size());
+ assertEquals(numRecords, result2.size());
+ Collections.sort(result1);
+ Collections.sort(result2);
+ for (int i = 0; i < numRecords; i++){
+ assertEquals(result1.get(i),result2.get(i));
+ }
+ }
+
+
+ private void cleanup() throws IOException, CommandNeedRetryException {
+ MiniCluster.deleteFile(cluster, basicFile);
+ HCatDataCheckUtil.dropTable(driver,PTNED_TABLE);
+ HCatDataCheckUtil.dropTable(driver,UNPTNED_TABLE);
+ }
+
+}