Repository: phoenix Updated Branches: refs/heads/master 05ff5618d -> c0d7a9fee
PHOENIX-2492 Expose PhoenixRecordWritable outside of spark Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c0d7a9fe Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c0d7a9fe Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c0d7a9fe Branch: refs/heads/master Commit: c0d7a9fee1a54eb20a870447e385e42cd6a4d81e Parents: 05ff561 Author: Nick Dimiduk <ndimi...@apache.org> Authored: Fri Dec 4 16:04:25 2015 -0800 Committer: Nick Dimiduk <ndimi...@apache.org> Committed: Tue Dec 22 15:56:32 2015 -0500 ---------------------------------------------------------------------- .../mapreduce/PhoenixRecordWritable.java | 192 +++++++++++++++++++ 1 file changed, 192 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c0d7a9fe/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java new file mode 100644 index 0000000..8d7d97a --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.schema.types.*; +import org.apache.phoenix.util.ColumnInfo; +import org.joda.time.DateTime; + +import java.sql.Array; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class PhoenixRecordWritable implements DBWritable { + + private final List<Object> upsertValues = new ArrayList<>(); + private final Map<String, Object> resultMap = new HashMap<>(); + private List<ColumnInfo> columnMetaDataList; + + /** For serialization; do not use. */ + public PhoenixRecordWritable() { + this(new ArrayList<ColumnInfo>()); + } + + public PhoenixRecordWritable(List<ColumnInfo> columnMetaDataList) { + this.columnMetaDataList = columnMetaDataList; + } + + /** + * Helper method to create a {@link Array} for a specific {@link PDataType}, and set it on + * the provided {@code stmt}. + */ + private static void setArrayInStatement(PreparedStatement stmt, PDataType<?> type, + Object[] obj, int position) throws SQLException { + Array sqlArray = stmt.getConnection().createArrayOf( + PDataType.arrayBaseType(type).getSqlTypeName(), obj); + stmt.setArray(position, sqlArray); + } + + private static Object[] primativeArrayToObjectArray(byte[] a) { + final Byte[] ret = new Byte[a.length]; + for (int i = 0; i < a.length; i++) { + ret[i] = a[i]; + } + return ret; + } + + private static Object[] primativeArrayToObjectArray(short[] a) { + final Short[] ret = new Short[a.length]; + for (int i = 0; i < a.length; i++) { + ret[i] = a[i]; + } + return ret; + } + + private static Object[] primativeArrayToObjectArray(int[] a) { + final Integer[] ret = new Integer[a.length]; + for (int i = 0; i < a.length; i++) { + ret[i] = a[i]; + } + return ret; + } + + private static Object[] primativeArrayToObjectArray(float[] a) { + final Float[] ret = new Float[a.length]; + for (int i = 0; i < a.length; i++) { + ret[i] = a[i]; + } + return ret; + } + + private static Object[] primativeArrayToObjectArray(double[] a) { + final Double[] ret = new Double[a.length]; + for (int i = 0; i < a.length; i++) { + ret[i] = a[i]; + } + return ret; + } + + private static Object[] primativeArrayToObjectArray(long[] a) { + final Long[] ret = new Long[a.length]; + for (int i = 0; i < a.length; i++) { + ret[i] = a[i]; + } + return ret; + } + + @Override public void write(PreparedStatement statement) throws SQLException { + // make sure we at least line up in size + if (upsertValues.size() != columnMetaDataList.size()) { + throw new UnsupportedOperationException("Provided " + upsertValues.size() + + " upsert values, but column metadata expects " + columnMetaDataList.size() + + " columns."); + } + + // correlate each value (v) to a column type (c) and an index (i) + for (int i = 0; i < upsertValues.size(); i++) { + Object v = upsertValues.get(i); + ColumnInfo c = columnMetaDataList.get(i); + + if (v == null) { + statement.setNull(i + 1, c.getSqlType()); + continue; + } + + // both Java and Joda dates used to work in 4.2.3, but now they must be java.sql.Date + // can override any other types here as needed + final Object finalObj; + final PDataType<?> finalType; + if (v instanceof DateTime) { + finalObj = new java.sql.Date(((DateTime) v).getMillis()); + finalType = PDate.INSTANCE; + } else if (v instanceof java.util.Date) { + finalObj = new java.sql.Date(((java.util.Date) v).getTime()); + finalType = PDate.INSTANCE; + } else { + finalObj = v; + finalType = c.getPDataType(); + } + + if (finalObj instanceof Object[]) { + setArrayInStatement(statement, finalType, (Object[]) finalObj, i + 1); + } else if (finalObj instanceof byte[]) { + // PVarbinary and PBinary are provided as byte[] but are treated as SQL objects + if (PDataType.equalsAny(finalType, PVarbinary.INSTANCE, PBinary.INSTANCE)) { + statement.setObject(i + 1, finalObj); + } + // otherwise set as array type + setArrayInStatement(statement, finalType, primativeArrayToObjectArray((byte[]) finalObj), i + 1); + } else if (finalObj instanceof short[]) { + setArrayInStatement(statement, finalType, primativeArrayToObjectArray((short[]) finalObj), i + 1); + } else if (finalObj instanceof int[]) { + setArrayInStatement(statement, finalType, primativeArrayToObjectArray((int[]) finalObj), i + 1); + } else if (finalObj instanceof long[]) { + setArrayInStatement(statement, finalType, primativeArrayToObjectArray((long[]) finalObj), i + 1); + } else if (finalObj instanceof float[]) { + setArrayInStatement(statement, finalType, primativeArrayToObjectArray((float[]) finalObj), i + 1); + } else if (finalObj instanceof double[]) { + setArrayInStatement(statement, finalType, primativeArrayToObjectArray((double[]) finalObj), i + 1); + } else { + statement.setObject(i + 1, finalObj); + } + } + } + + @Override public void readFields(ResultSet resultSet) throws SQLException { + ResultSetMetaData metaData = resultSet.getMetaData(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + // return the contents of a PhoenixArray, if necessary + Object value = resultSet.getObject(i); + if (value instanceof PhoenixArray) { + value = ((PhoenixArray) value).getArray(); + } + + // put a (ColumnLabel -> value) entry into the result map + resultMap.put(metaData.getColumnLabel(i), value); + } + } + + /** Append an object to the list of values to upsert. */ + public void add(Object value) { + upsertValues.add(value); + } + + /** @return an immutable view on the {@link ResultSet} content. */ + public Map<String, Object> getResultMap() { + return Collections.unmodifiableMap(resultMap); + } +}