http://git-wip-us.apache.org/repos/asf/phoenix/blob/f34c186a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRecordUpdater.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRecordUpdater.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRecordUpdater.java new file mode 100644 index 0000000..089a299 --- /dev/null +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRecordUpdater.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.phoenix.hive.PhoenixSerializer.DmlType; +import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; +import org.apache.phoenix.hive.mapreduce.PhoenixResultWritable; +import org.apache.phoenix.hive.util.PhoenixConnectionUtil; +import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil; +import org.apache.phoenix.hive.util.PhoenixUtil; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.schema.ConcurrentTableMutationException; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.util.QueryUtil; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Properties; + +public class PhoenixRecordUpdater implements RecordUpdater { + + private static final Log LOG = LogFactory.getLog(PhoenixRecordUpdater.class); + + private final Connection conn; + private final PreparedStatement pstmt; + private final long batchSize; + private long numRecords = 0; + + private Configuration config; + private String tableName; + private MetaDataClient metaDataClient; + private boolean restoreWalMode; + + private long rowCountDelta = 0; + + private PhoenixSerializer phoenixSerializer; + private ObjectInspector objInspector; + private PreparedStatement pstmtForDelete; + + public PhoenixRecordUpdater(Path path, AcidOutputFormat.Options options) throws IOException { + this.config = options.getConfiguration(); + tableName = config.get(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME); + + Properties props = new Properties(); + + try { + // Disable WAL + String walConfigName = tableName.toLowerCase() + PhoenixStorageHandlerConstants + .DISABLE_WAL; + boolean disableWal = config.getBoolean(walConfigName, false); + if (disableWal) { + if (LOG.isDebugEnabled()) { + LOG.debug(walConfigName + " is true. batch.mode will be set true."); + } + + props.setProperty(PhoenixStorageHandlerConstants.BATCH_MODE, "true"); + } + + this.conn = PhoenixConnectionUtil.getInputConnection(config, props); + + if (disableWal) { + metaDataClient = new MetaDataClient((PhoenixConnection) conn); + + if (!PhoenixUtil.isDisabledWal(metaDataClient, tableName)) { + // execute alter tablel statement if disable_wal is not true. + try { + PhoenixUtil.alterTableForWalDisable(conn, tableName, true); + } catch (ConcurrentTableMutationException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Concurrent modification of disableWAL"); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug(tableName + "s wal disabled."); + } + + // restore original value of disable_wal at the end. + restoreWalMode = true; + } + } + + this.batchSize = PhoenixConfigurationUtil.getBatchSize(config); + + if (LOG.isDebugEnabled()) { + LOG.debug("Batch-size : " + batchSize); + } + + String upsertQuery = QueryUtil.constructUpsertStatement(tableName, PhoenixUtil + .getColumnInfoList(conn, tableName)); + + if (LOG.isDebugEnabled()) { + LOG.debug("Upsert-query : " + upsertQuery); + } + this.pstmt = this.conn.prepareStatement(upsertQuery); + } catch (SQLException e) { + throw new IOException(e); + } + + this.objInspector = options.getInspector(); + try { + phoenixSerializer = new PhoenixSerializer(config, options.getTableProperties()); + } catch (SerDeException e) { + throw new IOException(e); + } + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hive.ql.io.RecordUpdater#insert(long, java.lang.Object) + */ + @Override + public void insert(long currentTransaction, Object row) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Insert - currentTranscation : " + currentTransaction + ", row : " + + PhoenixStorageHandlerUtil.toString(row)); + } + + PhoenixResultWritable pResultWritable = (PhoenixResultWritable) phoenixSerializer + .serialize(row, objInspector, DmlType.INSERT); + + if (LOG.isTraceEnabled()) { + LOG.trace("Data : " + pResultWritable.getValueList()); + } + + write(pResultWritable); + + rowCountDelta++; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hive.ql.io.RecordUpdater#update(long, java.lang.Object) + */ + @Override + public void update(long currentTransaction, Object row) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Update - currentTranscation : " + currentTransaction + ", row : " + + PhoenixStorageHandlerUtil.toString(row)); + } + + PhoenixResultWritable pResultWritable = (PhoenixResultWritable) phoenixSerializer + .serialize(row, objInspector, DmlType.UPDATE); + + if (LOG.isTraceEnabled()) { + LOG.trace("Data : " + pResultWritable.getValueList()); + } + + write(pResultWritable); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hive.ql.io.RecordUpdater#delete(long, java.lang.Object) + */ + @Override + public void delete(long currentTransaction, Object row) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Delete - currentTranscation : " + currentTransaction + ", row : " + + PhoenixStorageHandlerUtil.toString(row)); + } + + PhoenixResultWritable pResultWritable = (PhoenixResultWritable) phoenixSerializer + .serialize(row, objInspector, DmlType.DELETE); + + if (LOG.isTraceEnabled()) { + LOG.trace("Data : " + pResultWritable.getValueList()); + } + + if (pstmtForDelete == null) { + try { + String deleteQuery = PhoenixUtil.constructDeleteStatement(conn, tableName); + + if (LOG.isDebugEnabled()) { + LOG.debug("Delete query : " + deleteQuery); + } + + pstmtForDelete = conn.prepareStatement(deleteQuery); + } catch (SQLException e) { + throw new IOException(e); + } + } + + delete(pResultWritable); + + rowCountDelta--; + } + + private void delete(PhoenixResultWritable pResultWritable) throws IOException { + try { + pResultWritable.delete(pstmtForDelete); + numRecords++; + pstmtForDelete.executeUpdate(); + + if (numRecords % batchSize == 0) { + LOG.debug("Commit called on a batch of size : " + batchSize); + conn.commit(); + } + } catch (SQLException e) { + throw new IOException("Exception while deleting to table.", e); + } + } + + private void write(PhoenixResultWritable pResultWritable) throws IOException { + try { + pResultWritable.write(pstmt); + numRecords++; + pstmt.executeUpdate(); + + if (numRecords % batchSize == 0) { + LOG.debug("Commit called on a batch of size : " + batchSize); + conn.commit(); + } + } catch (SQLException e) { + throw new IOException("Exception while writing to table.", e); + } + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hive.ql.io.RecordUpdater#flush() + */ + @Override + public void flush() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Flush called"); + } + + try { + conn.commit(); + + if (LOG.isInfoEnabled()) { + LOG.info("Written row : " + numRecords); + } + } catch (SQLException e) { + LOG.error("SQLException while performing the commit for the task."); + throw new IOException(e); + } + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hive.ql.io.RecordUpdater#close(boolean) + */ + @Override + public void close(boolean abort) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("abort : " + abort); + } + + try { + conn.commit(); + + if (LOG.isInfoEnabled()) { + LOG.info("Written row : " + numRecords); + } + } catch (SQLException e) { + LOG.error("SQLException while performing the commit for the task."); + throw new IOException(e); + } finally { + try { + if (restoreWalMode && PhoenixUtil.isDisabledWal(metaDataClient, tableName)) { + try { + PhoenixUtil.alterTableForWalDisable(conn, tableName, false); + } catch (ConcurrentTableMutationException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Concurrent modification of disableWAL"); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug(tableName + "s wal enabled."); + } + } + + // flush when [table-name].auto.flush is true. + String autoFlushConfigName = tableName.toLowerCase() + + PhoenixStorageHandlerConstants.AUTO_FLUSH; + boolean autoFlush = config.getBoolean(autoFlushConfigName, false); + if (autoFlush) { + if (LOG.isDebugEnabled()) { + LOG.debug("autoFlush is " + autoFlush); + } + + PhoenixUtil.flush(conn, tableName); + } + + PhoenixUtil.closeResource(pstmt); + PhoenixUtil.closeResource(pstmtForDelete); + PhoenixUtil.closeResource(conn); + } catch (SQLException ex) { + LOG.error("SQLException while closing the connection for the task."); + throw new IOException(ex); + } + } + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hive.ql.io.RecordUpdater#getStats() + */ + @Override + public SerDeStats getStats() { + if (LOG.isDebugEnabled()) { + LOG.debug("getStats called"); + } + + SerDeStats stats = new SerDeStats(); + stats.setRowCount(rowCountDelta); + // Don't worry about setting raw data size diff. There is no reasonable way to calculate + // that without finding the row we are updating or deleting, which would be a mess. + return stats; + } + +}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f34c186a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRow.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRow.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRow.java new file mode 100644 index 0000000..fa307ce --- /dev/null +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRow.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive; + +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.serde2.StructObject; + +import java.util.List; +import java.util.Map; + +/** + * Implementation for Hive SerDe StructObject + */ +public class PhoenixRow implements StructObject { + + private List<String> columnList; + private Map<String, Object> resultRowMap; + + public PhoenixRow(List<String> columnList) { + this.columnList = columnList; + } + + public PhoenixRow setResultRowMap(Map<String, Object> resultRowMap) { + this.resultRowMap = resultRowMap; + return this; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hive.serde2.StructObject#getField(int) + */ + @Override + public Object getField(int fieldID) { + return resultRowMap.get(columnList.get(fieldID)); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hive.serde2.StructObject#getFieldsAsList() + */ + @Override + public List<Object> getFieldsAsList() { + return Lists.newArrayList(resultRowMap.values()); + } + + + @Override + public String toString() { + return resultRowMap.toString(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f34c186a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRowKey.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRowKey.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRowKey.java new file mode 100644 index 0000000..c4cbb2c --- /dev/null +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRowKey.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive; + +import com.google.common.collect.Maps; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.Map; + +/** + * Hive's RecordIdentifier implementation. + */ + +public class PhoenixRowKey extends RecordIdentifier { + + private Map<String, Object> rowKeyMap = Maps.newHashMap(); + + public PhoenixRowKey() { + + } + + public void setRowKeyMap(Map<String, Object> rowKeyMap) { + this.rowKeyMap = rowKeyMap; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + super.write(dataOutput); + + try (ObjectOutputStream oos = new ObjectOutputStream((OutputStream) dataOutput)) { + oos.writeObject(rowKeyMap); + oos.flush(); + } + } + + @SuppressWarnings("unchecked") + @Override + public void readFields(DataInput dataInput) throws IOException { + super.readFields(dataInput); + + try (ObjectInputStream ois = new ObjectInputStream((InputStream) dataInput)) { + rowKeyMap = (Map<String, Object>) ois.readObject(); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f34c186a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java new file mode 100644 index 0000000..dd38cfb --- /dev/null +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Writable; +import org.apache.phoenix.hive.PhoenixSerializer.DmlType; +import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; +import org.apache.phoenix.hive.mapreduce.PhoenixResultWritable; +import org.apache.phoenix.hive.objectinspector.PhoenixObjectInspectorFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +/** + * SerDe implementation for Phoenix Hive Storage + * + */ +public class PhoenixSerDe extends AbstractSerDe { + + public static final Log LOG = LogFactory.getLog(PhoenixSerDe.class); + + private PhoenixSerializer serializer; + private ObjectInspector objectInspector; + + private LazySerDeParameters serdeParams; + private PhoenixRow row; + + private Properties tableProperties; + + /** + * @throws SerDeException + */ + public PhoenixSerDe() throws SerDeException { + if (LOG.isDebugEnabled()) { + LOG.debug("PhoenixSerDe created"); + } + } + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + tableProperties = tbl; + + if (LOG.isDebugEnabled()) { + LOG.debug("SerDe initialize : " + tbl.getProperty("name")); + } + + serdeParams = new LazySerDeParameters(conf, tbl, getClass().getName()); + objectInspector = createLazyPhoenixInspector(conf, tbl); + + String inOutWork = tbl.getProperty(PhoenixStorageHandlerConstants.IN_OUT_WORK); + if (inOutWork == null) { + return; + } + + serializer = new PhoenixSerializer(conf, tbl); + row = new PhoenixRow(Lists.transform(serdeParams.getColumnNames(), new Function<String, + String>() { + + @Override + public String apply(String input) { + return input.toUpperCase(); + } + })); + } + + @Override + public Object deserialize(Writable result) throws SerDeException { + if (!(result instanceof PhoenixResultWritable)) { + throw new SerDeException(result.getClass().getName() + ": expects " + + "PhoenixResultWritable!"); + } + + return row.setResultRowMap(((PhoenixResultWritable) result).getResultMap()); + } + + @Override + public Class<? extends Writable> getSerializedClass() { + return PhoenixResultWritable.class; + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + try { + return serializer.serialize(obj, objInspector, DmlType.NONE); + } catch (Exception e) { + throw new SerDeException(e); + } + } + + @Override + public SerDeStats getSerDeStats() { + // no support for statistics + return null; + } + + public Properties getTableProperties() { + return tableProperties; + } + + public LazySerDeParameters getSerdeParams() { + return serdeParams; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return objectInspector; + } + + private ObjectInspector createLazyPhoenixInspector(Configuration conf, Properties tbl) throws + SerDeException { + List<String> columnNameList = Arrays.asList(tbl.getProperty(serdeConstants.LIST_COLUMNS) + .split(PhoenixStorageHandlerConstants.COMMA)); + List<TypeInfo> columnTypeList = TypeInfoUtils.getTypeInfosFromTypeString(tbl.getProperty + (serdeConstants.LIST_COLUMN_TYPES)); + + List<ObjectInspector> columnObjectInspectors = Lists.newArrayListWithExpectedSize + (columnTypeList.size()); + + for (TypeInfo typeInfo : columnTypeList) { + columnObjectInspectors.add(PhoenixObjectInspectorFactory.createObjectInspector + (typeInfo, serdeParams)); + } + + return LazyObjectInspectorFactory.getLazySimpleStructObjectInspector(columnNameList, + columnObjectInspectors, null, serdeParams.getSeparators()[0], serdeParams, + ObjectInspectorOptions.JAVA); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f34c186a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java new file mode 100644 index 0000000..e43ed0e --- /dev/null +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; +import org.apache.phoenix.hive.mapreduce.PhoenixResultWritable; +import org.apache.phoenix.hive.util.PhoenixConnectionUtil; +import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil; +import org.apache.phoenix.hive.util.PhoenixUtil; +import org.apache.phoenix.util.ColumnInfo; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Serializer used in PhoenixSerDe and PhoenixRecordUpdater to produce Writable. + */ +public class PhoenixSerializer { + + private static final Log LOG = LogFactory.getLog(PhoenixSerializer.class); + + public static enum DmlType { + NONE, + SELECT, + INSERT, + UPDATE, + DELETE + } + + private int columnCount = 0; + private PhoenixResultWritable pResultWritable; + + public PhoenixSerializer(Configuration config, Properties tbl) throws SerDeException { + try (Connection conn = PhoenixConnectionUtil.getInputConnection(config, tbl)) { + List<ColumnInfo> columnMetadata = PhoenixUtil.getColumnInfoList(conn, tbl.getProperty + (PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME)); + + columnCount = columnMetadata.size(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Column-meta : " + columnMetadata); + } + + pResultWritable = new PhoenixResultWritable(config, columnMetadata); + } catch (SQLException | IOException e) { + throw new SerDeException(e); + } + } + + public Writable serialize(Object values, ObjectInspector objInspector, DmlType dmlType) { + pResultWritable.clear(); + + final StructObjectInspector structInspector = (StructObjectInspector) objInspector; + final List<? extends StructField> fieldList = structInspector.getAllStructFieldRefs(); + + if (LOG.isTraceEnabled()) { + LOG.trace("FieldList : " + fieldList + " values(" + values.getClass() + ") : " + + values); + } + + int fieldCount = columnCount; + if (dmlType == DmlType.UPDATE || dmlType == DmlType.DELETE) { + fieldCount++; + } + + for (int i = 0; i < fieldCount; i++) { + if (fieldList.size() <= i) { + break; + } + + StructField structField = fieldList.get(i); + + if (LOG.isTraceEnabled()) { + LOG.trace("structField[" + i + "] : " + structField); + } + + if (structField != null) { + Object fieldValue = structInspector.getStructFieldData(values, structField); + ObjectInspector fieldOI = structField.getFieldObjectInspector(); + + String fieldName = structField.getFieldName(); + + if (LOG.isTraceEnabled()) { + LOG.trace("Field " + fieldName + "[" + i + "] : " + fieldValue + ", " + + fieldOI); + } + + Object value = null; + switch (fieldOI.getCategory()) { + case PRIMITIVE: + value = ((PrimitiveObjectInspector) fieldOI).getPrimitiveJavaObject + (fieldValue); + + if (LOG.isTraceEnabled()) { + LOG.trace("Field " + fieldName + "[" + i + "] : " + value + "(" + value + .getClass() + ")"); + } + + if (value instanceof HiveDecimal) { + value = ((HiveDecimal) value).bigDecimalValue(); + } else if (value instanceof HiveChar) { + value = ((HiveChar) value).getValue().trim(); + } + + pResultWritable.add(value); + break; + case LIST: + // Not support for arrays in insert statement yet + break; + case STRUCT: + if (dmlType == DmlType.DELETE) { + // When update/delete, First value is struct<transactionid:bigint, + // bucketid:int,rowid:bigint,primaryKey:binary>> + List<Object> fieldValueList = ((StandardStructObjectInspector) + fieldOI).getStructFieldsDataAsList(fieldValue); + + // convert to map from binary of primary key. + @SuppressWarnings("unchecked") + Map<String, Object> primaryKeyMap = (Map<String, Object>) + PhoenixStorageHandlerUtil.toMap(((BytesWritable) + fieldValueList.get(3)).getBytes()); + for (Object pkValue : primaryKeyMap.values()) { + pResultWritable.add(pkValue); + } + } + + break; + default: + new SerDeException("Phoenix Unsupported column type: " + fieldOI + .getCategory()); + } + } + } + + return pResultWritable; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f34c186a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java new file mode 100644 index 0000000..e8b5b19 --- /dev/null +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.metadata.InputEstimator; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; +import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat; +import org.apache.phoenix.hive.mapreduce.PhoenixOutputFormat; +import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposer; +import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposerManager; +import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; + +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * This class manages all the Phoenix/Hive table initial configurations and SerDe Election + */ +@SuppressWarnings("deprecation") +public class PhoenixStorageHandler extends DefaultStorageHandler implements + HiveStoragePredicateHandler, InputEstimator { + + private static final Log LOG = LogFactory.getLog(PhoenixStorageHandler.class); + + public PhoenixStorageHandler() { + if (LOG.isDebugEnabled()) { + LOG.debug("PhoenixStorageHandler created"); + } + } + + @Override + public HiveMetaHook getMetaHook() { + return new PhoenixMetaHook(); + } + + @SuppressWarnings("rawtypes") + @Override + public Class<? extends OutputFormat> getOutputFormatClass() { + return PhoenixOutputFormat.class; + } + + @Override + public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> + jobProperties) { + configureJobProperties(tableDesc, jobProperties); + + if (LOG.isDebugEnabled()) { + LOG.debug("Configuring input job for table : " + tableDesc.getTableName()); + } + + // initialization efficiency. Inform to SerDe about in/out work. + tableDesc.getProperties().setProperty(PhoenixStorageHandlerConstants.IN_OUT_WORK, + PhoenixStorageHandlerConstants.IN_WORK); + } + + @Override + public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> + jobProperties) { + configureJobProperties(tableDesc, jobProperties); + + if (LOG.isDebugEnabled()) { + LOG.debug("Configuring output job for table : " + tableDesc.getTableName()); + } + + // initialization efficiency. Inform to SerDe about in/out work. + tableDesc.getProperties().setProperty(PhoenixStorageHandlerConstants.IN_OUT_WORK, + PhoenixStorageHandlerConstants.OUT_WORK); + } + + @Override + public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> + jobProperties) { + configureJobProperties(tableDesc, jobProperties); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + protected void configureJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) { + Properties tableProperties = tableDesc.getProperties(); + + String inputFormatClassName = + tableProperties.getProperty(PhoenixStorageHandlerConstants + .HBASE_INPUT_FORMAT_CLASS); + + if (LOG.isDebugEnabled()) { + LOG.debug(PhoenixStorageHandlerConstants.HBASE_INPUT_FORMAT_CLASS + " is " + + inputFormatClassName); + } + + Class<?> inputFormatClass; + try { + if (inputFormatClassName != null) { + inputFormatClass = JavaUtils.loadClass(inputFormatClassName); + } else { + inputFormatClass = PhoenixInputFormat.class; + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e); + } + + if (inputFormatClass != null) { + tableDesc.setInputFileFormatClass((Class<? extends InputFormat>) inputFormatClass); + } + + String tableName = tableProperties.getProperty(PhoenixStorageHandlerConstants + .PHOENIX_TABLE_NAME); + if (tableName == null) { + tableName = tableDesc.getTableName(); + tableProperties.setProperty(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME, + tableName); + } + + jobProperties.put(PhoenixConfigurationUtil.INPUT_TABLE_NAME, tableName); + jobProperties.put(PhoenixStorageHandlerConstants.ZOOKEEPER_QUORUM, tableProperties + .getProperty(PhoenixStorageHandlerConstants.ZOOKEEPER_QUORUM, + PhoenixStorageHandlerConstants.DEFAULT_ZOOKEEPER_QUORUM)); + jobProperties.put(PhoenixStorageHandlerConstants.ZOOKEEPER_PORT, tableProperties + .getProperty(PhoenixStorageHandlerConstants.ZOOKEEPER_PORT, String.valueOf + (PhoenixStorageHandlerConstants.DEFAULT_ZOOKEEPER_PORT))); + jobProperties.put(PhoenixStorageHandlerConstants.ZOOKEEPER_PARENT, tableProperties + .getProperty(PhoenixStorageHandlerConstants.ZOOKEEPER_PARENT, + PhoenixStorageHandlerConstants.DEFAULT_ZOOKEEPER_PARENT)); + + jobProperties.put(hive_metastoreConstants.META_TABLE_STORAGE, this.getClass().getName()); + + // set configuration when direct work with HBase. + jobProperties.put(HConstants.ZOOKEEPER_QUORUM, jobProperties.get + (PhoenixStorageHandlerConstants.ZOOKEEPER_QUORUM)); + jobProperties.put(HConstants.ZOOKEEPER_CLIENT_PORT, jobProperties.get + (PhoenixStorageHandlerConstants.ZOOKEEPER_PORT)); + jobProperties.put(HConstants.ZOOKEEPER_ZNODE_PARENT, jobProperties.get + (PhoenixStorageHandlerConstants.ZOOKEEPER_PARENT)); + } + + @Override + public Class<? extends SerDe> getSerDeClass() { + return PhoenixSerDe.class; + } + + @Override + public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, + ExprNodeDesc predicate) { + PhoenixSerDe phoenixSerDe = (PhoenixSerDe) deserializer; + String tableName = phoenixSerDe.getTableProperties().getProperty + (PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME); + String predicateKey = PhoenixStorageHandlerUtil.getTableKeyOfSession(jobConf, tableName); + + if (LOG.isDebugEnabled()) { + LOG.debug("Decomposing predicate with predicateKey : " + predicateKey); + } + + List<String> columnNameList = phoenixSerDe.getSerdeParams().getColumnNames(); + PhoenixPredicateDecomposer predicateDecomposer = PhoenixPredicateDecomposerManager + .createPredicateDecomposer(predicateKey, columnNameList); + + return predicateDecomposer.decomposePredicate(predicate); + } + + @Override + public Estimation estimate(JobConf job, TableScanOperator ts, long remaining) throws + HiveException { + String hiveTableName = ts.getConf().getTableMetadata().getTableName(); + int reducerCount = job.getInt(hiveTableName + PhoenixStorageHandlerConstants + .PHOENIX_REDUCER_NUMBER, 1); + + if (LOG.isDebugEnabled()) { + LOG.debug("Estimating input size for table: " + hiveTableName + " with reducer count " + + reducerCount + ". Remaining : " + remaining); + } + + long bytesPerReducer = job.getLong(HiveConf.ConfVars.BYTESPERREDUCER.varname, + Long.parseLong(HiveConf.ConfVars.BYTESPERREDUCER.getDefaultValue())); + long totalLength = reducerCount * bytesPerReducer; + + return new Estimation(0, totalLength); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f34c186a/phoenix-hive/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java new file mode 100644 index 0000000..07c374e --- /dev/null +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive.constants; + +import com.google.common.collect.Lists; +import org.apache.hadoop.io.IntWritable; + +import java.util.List; + +/** + * Constants using for Hive Storage Handler implementation + */ +public class PhoenixStorageHandlerConstants { + + public static final String HBASE_INPUT_FORMAT_CLASS = "phoenix.input.format.class"; + + public static final String PHOENIX_TABLE_NAME = "phoenix.table.name"; + + public static final String DEFAULT_PHOENIX_INPUT_CLASS = "org.apache.phoenix.hive.mapreduce" + + ".PhoenixResultWritable"; + + public static final String ZOOKEEPER_QUORUM = "phoenix.zookeeper.quorum"; + public static final String ZOOKEEPER_PORT = "phoenix.zookeeper.client.port"; + public static final String ZOOKEEPER_PARENT = "phoenix.zookeeper.znode.parent"; + public static final String DEFAULT_ZOOKEEPER_QUORUM = "localhost"; + public static final int DEFAULT_ZOOKEEPER_PORT = 2181; + public static final String DEFAULT_ZOOKEEPER_PARENT = "/hbase"; + + public static final String PHOENIX_ROWKEYS = "phoenix.rowkeys"; + public static final String PHOENIX_COLUMN_MAPPING = "phoenix.column.mapping"; + public static final String PHOENIX_TABLE_OPTIONS = "phoenix.table.options"; + + public static final String PHOENIX_TABLE_QUERY_HINT = ".query.hint"; + public static final String PHOENIX_REDUCER_NUMBER = ".reducer.count"; + public static final String DISABLE_WAL = ".disable.wal"; + public static final String BATCH_MODE = "batch.mode"; + public static final String AUTO_FLUSH = ".auto.flush"; + + public static final String COLON = ":"; + public static final String COMMA = ","; + public static final String EMPTY_STRING = ""; + public static final String SPACE = " "; + public static final String LEFT_ROUND_BRACKET = "("; + public static final String RIGHT_ROUND_BRACKET = ")"; + public static final String QUOTATION_MARK = "'"; + public static final String EQUAL = "="; + public static final String IS = "is"; + public static final String QUESTION = "?"; + + public static final String SPLIT_BY_STATS = "split.by.stats"; + public static final String HBASE_SCAN_CACHE = "hbase.scan.cache"; + public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock"; + public static final String HBASE_DATE_FORMAT = "hbase.date.format"; + public static final String HBASE_TIMESTAMP_FORMAT = "hbase.timestamp.format"; + public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd"; + public static final String DEFAULT_TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; + + public static final String IN_OUT_WORK = "in.out.work"; + public static final String IN_WORK = "input"; + public static final String OUT_WORK = "output"; + + public static final String MR = "mr"; + public static final String TEZ = "tez"; + public static final String SPARK = "spark"; + + public static final String DATE_TYPE = "date"; + public static final String TIMESTAMP_TYPE = "timestamp"; + public static final String BETWEEN_COMPARATOR = "between"; + public static final String IN_COMPARATOR = "in"; + public static final List<String> COMMON_COMPARATOR = Lists.newArrayList("=", "<", ">", "<=", + ">="); + + // date/timestamp + public static final String COLUMNE_MARKER = "$columnName$"; + public static final String PATERN_MARKER = "$targetPattern$"; + public static final String DATE_PATTERN = "'?\\d{4}-\\d{2}-\\d{2}'?"; + public static final String TIMESTAMP_PATTERN = "'?\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\" + + ".?\\d{0,3}'?"; + public static final String COMMON_OPERATOR_PATTERN = "(\\(?" + COLUMNE_MARKER + "\\)?\\s*" + + "(=|>|<|<=|>=)\\s*(" + PATERN_MARKER + "))"; + public static final String BETWEEN_OPERATOR_PATTERN = "(\\(?" + COLUMNE_MARKER + "\\)?\\s*(" + + "(?i)not)?\\s*(?i)between\\s*(" + PATERN_MARKER + ")\\s*(?i)and\\s*(" + PATERN_MARKER + + "))"; + public static final String IN_OPERATOR_PATTERN = "(\\(?" + COLUMNE_MARKER + "\\)?\\s*((?i)" + + "not)?\\s*(?i)in\\s*\\((" + PATERN_MARKER + ",?\\s*)+\\))"; + + public static final String FUNCTION_VALUE_MARKER = "$value$"; + public static final String DATE_FUNCTION_TEMPLETE = "to_date(" + FUNCTION_VALUE_MARKER + ")"; + public static final String TIMESTAMP_FUNCTION_TEMPLATE = "to_timestamp(" + + FUNCTION_VALUE_MARKER + ")"; + + public static final IntWritable INT_ZERO = new IntWritable(0); +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f34c186a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java new file mode 100644 index 0000000..c5f6d18 --- /dev/null +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive.mapreduce; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RegionSizeCalculator; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; +import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposer; +import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposerManager; +import org.apache.phoenix.hive.ql.index.IndexSearchCondition; +import org.apache.phoenix.hive.query.PhoenixQueryBuilder; +import org.apache.phoenix.hive.util.PhoenixConnectionUtil; +import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil; +import org.apache.phoenix.iterate.MapReduceParallelScanGrouper; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.util.PhoenixRuntime; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Custom InputFormat to feed into Hive + */ +@SuppressWarnings({"deprecation", "rawtypes"}) +public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<WritableComparable, + T> { + + private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class); + + public PhoenixInputFormat() { + if (LOG.isDebugEnabled()) { + LOG.debug("PhoenixInputFormat created"); + } + } + + @Override + public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { + String tableName = jobConf.get(PhoenixConfigurationUtil.INPUT_TABLE_NAME); + List<IndexSearchCondition> conditionList = null; + String query; + String executionEngine = jobConf.get(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.getDefaultValue()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Target table name at split phase : " + tableName + "with whereCondition :" + + jobConf.get(TableScanDesc.FILTER_TEXT_CONF_STR) + + " and " + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname + " : " + + executionEngine); + } + + if (PhoenixStorageHandlerConstants.MR.equals(executionEngine)) { + String predicateKey = PhoenixStorageHandlerUtil.getTableKeyOfSession(jobConf, + tableName); + + if (LOG.isDebugEnabled()) { + LOG.debug("PredicateKey for MR job : " + predicateKey); + } + + PhoenixPredicateDecomposer predicateDecomposer = + PhoenixPredicateDecomposerManager.getPredicateDecomposer(predicateKey); + if (predicateDecomposer != null && predicateDecomposer.isCalledPPD()) { + conditionList = predicateDecomposer.getSearchConditionList(); + } + + query = PhoenixQueryBuilder.getInstance().buildQuery(jobConf, tableName, + ColumnProjectionUtils.getReadColumnNames(jobConf), conditionList); + } else if (PhoenixStorageHandlerConstants.TEZ.equals(executionEngine)) { + Map<String, String> columnTypeMap = PhoenixStorageHandlerUtil.createColumnTypeMap + (jobConf); + if (LOG.isDebugEnabled()) { + LOG.debug("Column type map for TEZ : " + columnTypeMap); + } + + String whereClause = jobConf.get(TableScanDesc.FILTER_TEXT_CONF_STR); + query = PhoenixQueryBuilder.getInstance().buildQuery(jobConf, tableName, + ColumnProjectionUtils.getReadColumnNames(jobConf), whereClause, columnTypeMap); + } else { + throw new IOException(executionEngine + " execution engine unsupported yet."); + } + + final QueryPlan queryPlan = getQueryPlan(jobConf, query); + final List<KeyRange> allSplits = queryPlan.getSplits(); + final List<InputSplit> splits = generateSplits(jobConf, queryPlan, allSplits, query); + + return splits.toArray(new InputSplit[splits.size()]); + } + + private List<InputSplit> generateSplits(final JobConf jobConf, final QueryPlan qplan, + final List<KeyRange> splits, String query) throws + IOException { + Preconditions.checkNotNull(qplan); + Preconditions.checkNotNull(splits); + final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size()); + + Path[] tablePaths = FileInputFormat.getInputPaths(ShimLoader.getHadoopShims() + .newJobContext(new Job(jobConf))); + boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS, + false); + int scanCacheSize = jobConf.getInt(PhoenixStorageHandlerConstants.HBASE_SCAN_CACHE, -1); + + if (LOG.isDebugEnabled()) { + LOG.debug("Generating splits with scanCacheSize : " + scanCacheSize); + } + + // Adding Localization + HConnection connection = HConnectionManager.createConnection(jobConf); + RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan + .getTableRef().getTable().getPhysicalName().toString())); + RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection + .getAdmin()); + + for (List<Scan> scans : qplan.getScans()) { + PhoenixInputSplit inputSplit; + + HRegionLocation location = regionLocator.getRegionLocation(scans.get(0).getStartRow() + , false); + long regionSize = sizeCalculator.getRegionSize(location.getRegionInfo().getRegionName + ()); + String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG); + + if (splitByStats) { + for (Scan aScan : scans) { + if (scanCacheSize > 0) { + aScan.setCaching(scanCacheSize); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan + .getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" + + aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan + .getBatch() + "] and regionLocation : " + regionLocation); + } + + inputSplit = new PhoenixInputSplit(Lists.newArrayList(aScan), tablePaths[0], + regionLocation, regionSize); + inputSplit.setQuery(query); + psplits.add(inputSplit); + } + } else { + if (scanCacheSize > 0) { + for (Scan aScan : scans) { + aScan.setCaching(scanCacheSize); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans + .get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans + .size() - 1).getStopRow())); + LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans + .get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " + + "[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks() + + ", " + scans.get(0).getBatch() + "] and regionLocation : " + + regionLocation); + + for (int i = 0, limit = scans.size(); i < limit; i++) { + LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes + .toStringBinary(scans.get(i).getAttribute + (BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY))); + } + } + + inputSplit = new PhoenixInputSplit(scans, tablePaths[0], regionLocation, + regionSize); + inputSplit.setQuery(query); + psplits.add(inputSplit); + } + } + + return psplits; + } + + @Override + public RecordReader<WritableComparable, T> getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws + IOException { + final QueryPlan queryPlan = getQueryPlan(job, ((PhoenixInputSplit) split).getQuery()); + @SuppressWarnings("unchecked") + final Class<T> inputClass = (Class<T>) job.getClass(PhoenixConfigurationUtil.INPUT_CLASS, + PhoenixResultWritable.class); + + PhoenixRecordReader<T> recordReader = new PhoenixRecordReader<T>(inputClass, job, + queryPlan); + recordReader.initialize(split); + + return recordReader; + } + + /** + * Returns the query plan associated with the select query. + */ + private QueryPlan getQueryPlan(final Configuration configuration, String selectStatement) + throws IOException { + try { + final String currentScnValue = configuration.get(PhoenixConfigurationUtil + .CURRENT_SCN_VALUE); + final Properties overridingProps = new Properties(); + if (currentScnValue != null) { + overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue); + } + final Connection connection = PhoenixConnectionUtil.getInputConnection(configuration, + overridingProps); + Preconditions.checkNotNull(selectStatement); + final Statement statement = connection.createStatement(); + final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); + + if (LOG.isDebugEnabled()) { + LOG.debug("Compiled query : " + selectStatement); + } + + // Optimize the query plan so that we potentially use secondary indexes + final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement); + // Initialize the query plan so it sets up the parallel scans + queryPlan.iterator(MapReduceParallelScanGrouper.getInstance()); + return queryPlan; + } catch (Exception exception) { + LOG.error(String.format("Failed to get the query plan with error [%s]", exception.getMessage())); + throw new RuntimeException(exception); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f34c186a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputSplit.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputSplit.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputSplit.java new file mode 100644 index 0000000..d76e863 --- /dev/null +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputSplit.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive.mapreduce; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.phoenix.query.KeyRange; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +/** + * InputSplit implementation. Represents the data to be processed by an individual Mapper + */ +public class PhoenixInputSplit extends FileSplit implements InputSplit { + + private List<Scan> scans; + private KeyRange keyRange; + + private long regionSize; + + // query is in the split because it is not delivered in jobConf. + private String query; + + public PhoenixInputSplit() { + } + + public PhoenixInputSplit(final List<Scan> scans, Path dummyPath, String regionLocation, long + length) { + super(dummyPath, 0, 0, new String[]{regionLocation}); + + regionSize = length; + + Preconditions.checkNotNull(scans); + Preconditions.checkState(!scans.isEmpty()); + this.scans = scans; + init(); + } + + public List<Scan> getScans() { + return scans; + } + + public KeyRange getKeyRange() { + return keyRange; + } + + public String getQuery() { + return query; + } + + public void setQuery(String query) { + this.query = query; + } + + private void init() { + this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size() - + 1).getStopRow()); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + + Preconditions.checkNotNull(scans); + WritableUtils.writeVInt(out, scans.size()); + for (Scan scan : scans) { + ClientProtos.Scan protoScan = ProtobufUtil.toScan(scan); + byte[] protoScanBytes = protoScan.toByteArray(); + WritableUtils.writeVInt(out, protoScanBytes.length); + out.write(protoScanBytes); + } + + WritableUtils.writeString(out, query); + WritableUtils.writeVLong(out, regionSize); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + + int count = WritableUtils.readVInt(in); + scans = Lists.newArrayListWithExpectedSize(count); + for (int i = 0; i < count; i++) { + byte[] protoScanBytes = new byte[WritableUtils.readVInt(in)]; + in.readFully(protoScanBytes); + ClientProtos.Scan protoScan = ClientProtos.Scan.parseFrom(protoScanBytes); + Scan scan = ProtobufUtil.toScan(protoScan); + scans.add(scan); + } + init(); + + query = WritableUtils.readString(in); + regionSize = WritableUtils.readVLong(in); + } + + @Override + public long getLength() { + return regionSize; + } + + @Override + public String[] getLocations() throws IOException { + return new String[]{}; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + keyRange.hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof PhoenixInputSplit)) { + return false; + } + PhoenixInputSplit other = (PhoenixInputSplit) obj; + if (keyRange == null) { + if (other.keyRange != null) { + return false; + } + } else if (!keyRange.equals(other.keyRange)) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f34c186a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixOutputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixOutputFormat.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixOutputFormat.java new file mode 100644 index 0000000..ed47176 --- /dev/null +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixOutputFormat.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.hadoop.util.Progressable; +import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Properties; + +/** + * Custom OutputFormat to feed into Hive. Describes the output-specification for a Map-Reduce job. + */ +public class PhoenixOutputFormat<T extends DBWritable> implements OutputFormat<NullWritable, T>, + AcidOutputFormat<NullWritable, T> { + + private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class); + + public PhoenixOutputFormat() { + if (LOG.isDebugEnabled()) { + LOG.debug("PhoenixOutputFormat created"); + } + } + + @Override + public RecordWriter<NullWritable, T> getRecordWriter(FileSystem ignored, JobConf jobConf, + String name, Progressable progress) + throws IOException { + return createRecordWriter(jobConf, new Properties()); + } + + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { + + } + + @Override + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter + (JobConf jobConf, Path finalOutPath, Class<? extends Writable> valueClass, boolean + isCompressed, Properties tableProperties, Progressable progress) throws + IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Get RecordWriter for finalOutPath : " + finalOutPath + ", valueClass" + + " : " + + valueClass + .getName() + ", isCompressed : " + isCompressed + ", tableProperties " + + ": " + tableProperties + ", progress : " + progress); + } + + return createRecordWriter(jobConf, new Properties()); + } + + @Override + public RecordUpdater getRecordUpdater(Path path, org.apache.hadoop.hive.ql.io + .AcidOutputFormat.Options options) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Get RecordWriter for path : " + path + ", options : " + + PhoenixStorageHandlerUtil + .getOptionsValue(options)); + } + return new PhoenixRecordWriter<T>(path, options); + } + + @Override + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getRawRecordWriter(Path path, + org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options options) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Get RawRecordWriter for path : " + path + ", options : " + + PhoenixStorageHandlerUtil.getOptionsValue(options)); + } + + return new PhoenixRecordWriter<T>(path, options); + } + + private PhoenixRecordWriter<T> createRecordWriter(Configuration config, Properties properties) { + try { + return new PhoenixRecordWriter<T>(config, properties); + } catch (SQLException e) { + LOG.error("Error during PhoenixRecordWriter instantiation :" + e.getMessage()); + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f34c186a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java new file mode 100644 index 0000000..325efe2 --- /dev/null +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive.mapreduce; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.hive.PhoenixRowKey; +import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil; +import org.apache.phoenix.iterate.ConcatResultIterator; +import org.apache.phoenix.iterate.LookAheadResultIterator; +import org.apache.phoenix.iterate.MapReduceParallelScanGrouper; +import org.apache.phoenix.iterate.PeekingResultIterator; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.iterate.RoundRobinResultIterator; +import org.apache.phoenix.iterate.SequenceResultIterator; +import org.apache.phoenix.iterate.TableResultIterator; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.monitoring.ReadMetricQueue; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; + +import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; + +/** + * @RecordReader implementation that iterates over the the records. + */ +@SuppressWarnings("rawtypes") +public class PhoenixRecordReader<T extends DBWritable> implements + RecordReader<WritableComparable, T> { + + private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class); + + private final Configuration configuration; + private final QueryPlan queryPlan; + private WritableComparable key; + private T value = null; + private Class<T> inputClass; + private ResultIterator resultIterator = null; + private PhoenixResultSet resultSet; + private long readCount; + + private boolean isTransactional; + + public PhoenixRecordReader(Class<T> inputClass, final Configuration configuration, final + QueryPlan queryPlan) throws IOException { + this.inputClass = inputClass; + this.configuration = configuration; + this.queryPlan = queryPlan; + + isTransactional = PhoenixStorageHandlerUtil.isTransactionalTable(configuration); + } + + public void initialize(InputSplit split) throws IOException { + final PhoenixInputSplit pSplit = (PhoenixInputSplit) split; + final List<Scan> scans = pSplit.getScans(); + + if (LOG.isInfoEnabled()) { + LOG.info("Target table : " + queryPlan.getTableRef().getTable().getPhysicalName()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans.get(0) + .getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans.size() - 1) + .getStopRow())); + LOG.debug("First scan : " + scans.get(0) + " scanAttribute : " + scans.get(0) + .getAttributesMap()); + + for (int i = 0, limit = scans.size(); i < limit; i++) { + LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + + Bytes.toStringBinary(scans.get(i).getAttribute(BaseScannerRegionObserver + .EXPECTED_UPPER_REGION_KEY))); + } + } + + try { + List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size + ()); + StatementContext ctx = queryPlan.getContext(); + ReadMetricQueue readMetrics = ctx.getReadMetricsQueue(); + String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString(); + long renewScannerLeaseThreshold = queryPlan.getContext().getConnection() + .getQueryServices().getRenewLeaseThresholdMilliSeconds(); + for (Scan scan : scans) { + scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes + .toBytes(true)); + final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan + .getContext().getConnection().getMutationState(), scan, + readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold, + queryPlan, MapReduceParallelScanGrouper.getInstance()); + + PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap + (tableResultIterator); + iterators.add(peekingResultIterator); + } + ResultIterator iterator = queryPlan.useRoundRobinIterator() + ? RoundRobinResultIterator.newIterator(iterators, queryPlan) + : ConcatResultIterator.newIterator(iterators); + if (queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) { + iterator = new SequenceResultIterator(iterator, queryPlan.getContext() + .getSequenceManager()); + } + this.resultIterator = iterator; + // Clone the row projector as it's not thread safe and would be used + // simultaneously by multiple threads otherwise. + this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector() + .cloneIfNecessary(), + queryPlan.getContext()); + } catch (SQLException e) { + LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ", e + .getMessage())); + Throwables.propagate(e); + } + } + + @Override + public boolean next(WritableComparable key, T value) throws IOException { + try { + if (!resultSet.next()) { + return false; + } + value.readFields(resultSet); + + if (isTransactional) { + ((PhoenixResultWritable) value).readPrimaryKey((PhoenixRowKey) key); + } + + ++readCount; + + if (LOG.isTraceEnabled()) { + LOG.trace("Result[" + readCount + "] : " + ((PhoenixResultWritable) value) + .getResultMap()); + } + + return true; + } catch (SQLException e) { + LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ", + e.getMessage())); + throw new RuntimeException(e); + } + } + + @Override + public WritableComparable createKey() { + if (isTransactional) { + key = new PhoenixRowKey(); + } else { + key = NullWritable.get(); + } + + return key; + } + + @Override + public T createValue() { + value = ReflectionUtils.newInstance(inputClass, this.configuration); + return value; + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + if (LOG.isInfoEnabled()) { + LOG.info("Read Count : " + readCount); + } + + if (resultIterator != null) { + try { + resultIterator.close(); + } catch (SQLException e) { + LOG.error(" Error closing resultset."); + throw new RuntimeException(e); + } + } + + } + + @Override + public float getProgress() throws IOException { + return 0; + } +}