Repository: hive Updated Branches: refs/heads/master 18fb1b3f3 -> 1105ef397
http://git-wip-us.apache.org/repos/asf/hive/blob/1105ef39/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java index 2bb4a0f..f1c8477 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/JsonSerDe.java @@ -20,30 +20,21 @@ package org.apache.hadoop.hive.serde2; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.charset.CharacterCodingException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.Date; -import org.apache.hadoop.hive.common.type.HiveChar; -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.common.type.Timestamp; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.json.HiveJsonStructReader; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; @@ -61,10 +52,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspect import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -73,42 +60,32 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.TimestampParser; -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.JsonParseException; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonToken; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, - serdeConstants.LIST_COLUMN_TYPES, - serdeConstants.TIMESTAMP_FORMATS}) + serdeConstants.LIST_COLUMN_TYPES, + serdeConstants.TIMESTAMP_FORMATS }) -// FIXME: move TestJsonSerDe from hcat to serde2 public class JsonSerDe extends AbstractSerDe { private static final Logger LOG = LoggerFactory.getLogger(JsonSerDe.class); private List<String> columnNames; - private StructTypeInfo schema; - private JsonFactory jsonFactory = null; - - private StandardStructObjectInspector cachedObjectInspector; - private TimestampParser tsParser; + private HiveJsonStructReader structReader; + private StructTypeInfo rowTypeInfo; @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { List<TypeInfo> columnTypes; - StructTypeInfo rowTypeInfo; - LOG.debug("Initializing JsonSerDe: {}", tbl.entrySet()); - // Get column names and types + // Get column names String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); - String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl - .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); + .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) + : String.valueOf(SerDeUtils.COMMA); // all table column names if (columnNameProperty.isEmpty()) { columnNames = Collections.emptyList(); @@ -117,6 +94,7 @@ public class JsonSerDe extends AbstractSerDe { } // all column types + String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); if (columnTypeProperty.isEmpty()) { columnTypes = Collections.emptyList(); } else { @@ -129,298 +107,34 @@ public class JsonSerDe extends AbstractSerDe { assert (columnNames.size() == columnTypes.size()); rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); - schema = rowTypeInfo; - LOG.debug("schema : {}", schema); - cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo); - jsonFactory = new JsonFactory(); - tsParser = new TimestampParser( - HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS))); + TimestampParser tsParser = new TimestampParser( + HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS))); + structReader = new HiveJsonStructReader(rowTypeInfo, tsParser); + structReader.setIgnoreUnknownFields(true); + structReader.enableHiveColIndexParsing(true); + structReader.setWritablesUsage(true); } /** * Takes JSON string in Text form, and has to return an object representation above * it that's readable by the corresponding object inspector. + * * For this implementation, since we're using the jackson parser, we can construct * our own object implementation, and we use HCatRecord for it */ @Override public Object deserialize(Writable blob) throws SerDeException { + Object row; Text t = (Text) blob; - JsonParser p; - List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null)); try { - p = jsonFactory.createJsonParser(new ByteArrayInputStream((t.getBytes()))); - if (p.nextToken() != JsonToken.START_OBJECT) { - throw new IOException("Start token not found where expected"); - } - JsonToken token; - while (((token = p.nextToken()) != JsonToken.END_OBJECT) && (token != null)) { - // iterate through each token, and create appropriate object here. - populateRecord(r, token, p, schema); - } - } catch (JsonParseException e) { - LOG.warn("Error [{}] parsing json text [{}].", e, t); - throw new SerDeException(e); - } catch (IOException e) { + row = structReader.parseStruct(new ByteArrayInputStream((t.getBytes()), 0, t.getLength())); + return row; + } catch (Exception e) { LOG.warn("Error [{}] parsing json text [{}].", e, t); throw new SerDeException(e); } - - return r; - } - - private void populateRecord(List<Object> r, JsonToken token, JsonParser p, StructTypeInfo s) throws IOException { - if (token != JsonToken.FIELD_NAME) { - throw new IOException("Field name expected"); - } - String fieldName = p.getText().toLowerCase(); - int fpos = s.getAllStructFieldNames().indexOf(fieldName); - if (fpos == -1) { - fpos = getPositionFromHiveInternalColumnName(fieldName); - LOG.debug("NPE finding position for field [{}] in schema [{}]," - + " attempting to check if it is an internal column name like _col0", fieldName, s); - if (fpos == -1) { - skipValue(p); - return; // unknown field, we return. We'll continue from the next field onwards. - } - // If we get past this, then the column name did match the hive pattern for an internal - // column name, such as _col0, etc, so it *MUST* match the schema for the appropriate column. - // This means people can't use arbitrary column names such as _col0, and expect us to ignore it - // if we find it. - if (!fieldName.equalsIgnoreCase(getHiveInternalColumnName(fpos))) { - LOG.error("Hive internal column name {} and position " - + "encoding {} for the column name are at odds", fieldName, fpos); - throw new IOException("Hive internal column name (" + fieldName - + ") and position encoding (" + fpos - + ") for the column name are at odds"); - } - // If we reached here, then we were successful at finding an alternate internal - // column mapping, and we're about to proceed. - } - Object currField = extractCurrentField(p, s.getStructFieldTypeInfo(fieldName), false); - r.set(fpos, currField); - } - - public String getHiveInternalColumnName(int fpos) { - return HiveConf.getColumnInternalName(fpos); - } - - public int getPositionFromHiveInternalColumnName(String internalName) { - // return HiveConf.getPositionFromInternalName(fieldName); - // The above line should have been all the implementation that - // we need, but due to a bug in that impl which recognizes - // only single-digit columns, we need another impl here. - Pattern internalPattern = Pattern.compile("_col([0-9]+)"); - Matcher m = internalPattern.matcher(internalName); - if (!m.matches()) { - return -1; - } else { - return Integer.parseInt(m.group(1)); - } - } - - /** - * Utility method to extract (and forget) the next value token from the JsonParser, - * as a whole. The reason this function gets called is to yank out the next value altogether, - * because it corresponds to a field name that we do not recognize, and thus, do not have - * a schema/type for. Thus, this field is to be ignored. - * - * @throws IOException - * @throws JsonParseException - */ - private void skipValue(JsonParser p) throws JsonParseException, IOException { - JsonToken valueToken = p.nextToken(); - - if ((valueToken == JsonToken.START_ARRAY) || (valueToken == JsonToken.START_OBJECT)) { - // if the currently read token is a beginning of an array or object, move stream forward - // skipping any child tokens till we're at the corresponding END_ARRAY or END_OBJECT token - p.skipChildren(); - } - // At the end of this function, the stream should be pointing to the last token that - // corresponds to the value being skipped. This way, the next call to nextToken - // will advance it to the next field name. - } - - /** - * Utility method to extract current expected field from given JsonParser - * isTokenCurrent is a boolean variable also passed in, which determines - * if the JsonParser is already at the token we expect to read next, or - * needs advancing to the next before we read. - */ - private Object extractCurrentField(JsonParser p, TypeInfo fieldTypeInfo, - boolean isTokenCurrent) throws IOException { - Object val = null; - JsonToken valueToken; - if (isTokenCurrent) { - valueToken = p.getCurrentToken(); - } else { - valueToken = p.nextToken(); - } - - switch (fieldTypeInfo.getCategory()) { - case PRIMITIVE: - PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = PrimitiveObjectInspector.PrimitiveCategory.UNKNOWN; - if (fieldTypeInfo instanceof PrimitiveTypeInfo) { - primitiveCategory = ((PrimitiveTypeInfo) fieldTypeInfo).getPrimitiveCategory(); - } - switch (primitiveCategory) { - case INT: - val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getIntValue(); - break; - case BYTE: - val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getByteValue(); - break; - case SHORT: - val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getShortValue(); - break; - case LONG: - val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getLongValue(); - break; - case BOOLEAN: - String bval = (valueToken == JsonToken.VALUE_NULL) ? null : p.getText(); - if (bval != null) { - val = Boolean.valueOf(bval); - } else { - val = null; - } - break; - case FLOAT: - val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getFloatValue(); - break; - case DOUBLE: - val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getDoubleValue(); - break; - case STRING: - val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getText(); - break; - case BINARY: - String b = (valueToken == JsonToken.VALUE_NULL) ? null : p.getText(); - if (b != null) { - try { - String t = Text.decode(b.getBytes(), 0, b.getBytes().length); - return t.getBytes(); - } catch (CharacterCodingException e) { - LOG.warn("Error generating json binary type from object.", e); - return null; - } - } else { - val = null; - } - break; - case DATE: - val = (valueToken == JsonToken.VALUE_NULL) ? null : Date.valueOf(p.getText()); - break; - case TIMESTAMP: - val = (valueToken == JsonToken.VALUE_NULL) ? null : tsParser.parseTimestamp(p.getText()); - break; - case DECIMAL: - val = (valueToken == JsonToken.VALUE_NULL) ? null : HiveDecimal.create(p.getText()); - break; - case VARCHAR: - int vLen = ((BaseCharTypeInfo) fieldTypeInfo).getLength(); - val = (valueToken == JsonToken.VALUE_NULL) ? null : new HiveVarchar(p.getText(), vLen); - break; - case CHAR: - int cLen = ((BaseCharTypeInfo) fieldTypeInfo).getLength(); - val = (valueToken == JsonToken.VALUE_NULL) ? null : new HiveChar(p.getText(), cLen); - break; - } - break; - case LIST: - if (valueToken == JsonToken.VALUE_NULL) { - val = null; - break; - } - if (valueToken != JsonToken.START_ARRAY) { - throw new IOException("Start of Array expected"); - } - List<Object> arr = new ArrayList<Object>(); - while ((valueToken = p.nextToken()) != JsonToken.END_ARRAY) { - arr.add(extractCurrentField(p, ((ListTypeInfo)fieldTypeInfo).getListElementTypeInfo(), true)); - } - val = arr; - break; - case MAP: - if (valueToken == JsonToken.VALUE_NULL) { - val = null; - break; - } - if (valueToken != JsonToken.START_OBJECT) { - throw new IOException("Start of Object expected"); - } - Map<Object, Object> map = new LinkedHashMap<Object, Object>(); - while ((valueToken = p.nextToken()) != JsonToken.END_OBJECT) { - Object k = getObjectOfCorrespondingPrimitiveType(p.getCurrentName(), - (PrimitiveTypeInfo) ((MapTypeInfo)fieldTypeInfo).getMapKeyTypeInfo()); - Object v = extractCurrentField(p, ((MapTypeInfo) fieldTypeInfo).getMapValueTypeInfo(), false); - map.put(k, v); - } - val = map; - break; - case STRUCT: - if (valueToken == JsonToken.VALUE_NULL) { - val = null; - break; - } - if (valueToken != JsonToken.START_OBJECT) { - throw new IOException("Start of Object expected"); - } - ArrayList<TypeInfo> subSchema = ((StructTypeInfo)fieldTypeInfo).getAllStructFieldTypeInfos(); - int sz = subSchema.size(); - List<Object> struct = new ArrayList<Object>(Collections.nCopies(sz, null)); - while ((valueToken = p.nextToken()) != JsonToken.END_OBJECT) { - populateRecord(struct, valueToken, p, ((StructTypeInfo) fieldTypeInfo)); - } - val = struct; - break; - default: - LOG.error("Unknown type found: " + fieldTypeInfo); - return null; - } - return val; - } - - private Object getObjectOfCorrespondingPrimitiveType(String s, PrimitiveTypeInfo mapKeyType) - throws IOException { - switch (mapKeyType.getPrimitiveCategory()) { - case INT: - return Integer.valueOf(s); - case BYTE: - return Byte.valueOf(s); - case SHORT: - return Short.valueOf(s); - case LONG: - return Long.valueOf(s); - case BOOLEAN: - return (s.equalsIgnoreCase("true")); - case FLOAT: - return Float.valueOf(s); - case DOUBLE: - return Double.valueOf(s); - case STRING: - return s; - case BINARY: - try { - String t = Text.decode(s.getBytes(), 0, s.getBytes().length); - return t.getBytes(); - } catch (CharacterCodingException e) { - LOG.warn("Error generating json binary type from object.", e); - return null; - } - case DATE: - return Date.valueOf(s); - case TIMESTAMP: - return Timestamp.valueOf(s); - case DECIMAL: - return HiveDecimal.create(s); - case VARCHAR: - return new HiveVarchar(s, ((BaseCharTypeInfo) mapKeyType).getLength()); - case CHAR: - return new HiveChar(s, ((BaseCharTypeInfo) mapKeyType).getLength()); - } - throw new IOException("Could not convert from string to map type " + mapKeyType.getTypeName()); } /** @@ -462,7 +176,6 @@ public class JsonSerDe extends AbstractSerDe { private static StringBuilder appendWithQuotes(StringBuilder sb, String value) { return sb == null ? null : sb.append(SerDeUtils.QUOTE).append(value).append(SerDeUtils.QUOTE); } - // TODO : code section copied over from SerDeUtils because of non-standard json production there // should use quotes for all field names. We should fix this there, and then remove this copy. // See http://jackson.codehaus.org/1.7.3/javadoc/org/codehaus/jackson/JsonParser.Feature.html#ALLOW_UNQUOTED_FIELD_NAMES @@ -472,184 +185,184 @@ public class JsonSerDe extends AbstractSerDe { private static void buildJSONString(StringBuilder sb, Object o, ObjectInspector oi) throws IOException { switch (oi.getCategory()) { - case PRIMITIVE: { - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi; - if (o == null) { - sb.append("null"); - } else { - switch (poi.getPrimitiveCategory()) { - case BOOLEAN: { - boolean b = ((BooleanObjectInspector) poi).get(o); - sb.append(b ? "true" : "false"); - break; - } - case BYTE: { - sb.append(((ByteObjectInspector) poi).get(o)); - break; - } - case SHORT: { - sb.append(((ShortObjectInspector) poi).get(o)); - break; - } - case INT: { - sb.append(((IntObjectInspector) poi).get(o)); - break; - } - case LONG: { - sb.append(((LongObjectInspector) poi).get(o)); - break; - } - case FLOAT: { - sb.append(((FloatObjectInspector) poi).get(o)); - break; - } - case DOUBLE: { - sb.append(((DoubleObjectInspector) poi).get(o)); - break; - } - case STRING: { - String s = - SerDeUtils.escapeString(((StringObjectInspector) poi).getPrimitiveJavaObject(o)); - appendWithQuotes(sb, s); - break; - } - case BINARY: - byte[] b = ((BinaryObjectInspector) oi).getPrimitiveJavaObject(o); - Text txt = new Text(); - txt.set(b, 0, b.length); - appendWithQuotes(sb, SerDeUtils.escapeString(txt.toString())); - break; - case DATE: - Date d = ((DateObjectInspector) poi).getPrimitiveJavaObject(o); - appendWithQuotes(sb, d.toString()); - break; - case TIMESTAMP: { - Timestamp t = ((TimestampObjectInspector) poi).getPrimitiveJavaObject(o); - appendWithQuotes(sb, t.toString()); - break; - } - case DECIMAL: - sb.append(((HiveDecimalObjectInspector) poi).getPrimitiveJavaObject(o)); - break; - case VARCHAR: { - String s = SerDeUtils.escapeString( - ((HiveVarcharObjectInspector) poi).getPrimitiveJavaObject(o).toString()); - appendWithQuotes(sb, s); - break; - } - case CHAR: { - //this should use HiveChar.getPaddedValue() but it's protected; currently (v0.13) - // HiveChar.toString() returns getPaddedValue() - String s = SerDeUtils.escapeString( - ((HiveCharObjectInspector) poi).getPrimitiveJavaObject(o).toString()); - appendWithQuotes(sb, s); - break; - } - default: - throw new RuntimeException("Unknown primitive type: " + poi.getPrimitiveCategory()); - } + case PRIMITIVE: { + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi; + if (o == null) { + sb.append("null"); + } else { + switch (poi.getPrimitiveCategory()) { + case BOOLEAN: { + boolean b = ((BooleanObjectInspector) poi).get(o); + sb.append(b ? "true" : "false"); + break; + } + case BYTE: { + sb.append(((ByteObjectInspector) poi).get(o)); + break; + } + case SHORT: { + sb.append(((ShortObjectInspector) poi).get(o)); + break; + } + case INT: { + sb.append(((IntObjectInspector) poi).get(o)); + break; + } + case LONG: { + sb.append(((LongObjectInspector) poi).get(o)); + break; + } + case FLOAT: { + sb.append(((FloatObjectInspector) poi).get(o)); + break; + } + case DOUBLE: { + sb.append(((DoubleObjectInspector) poi).get(o)); + break; + } + case STRING: { + String s = + SerDeUtils.escapeString(((StringObjectInspector) poi).getPrimitiveJavaObject(o)); + appendWithQuotes(sb, s); + break; + } + case BINARY: + byte[] b = ((BinaryObjectInspector) oi).getPrimitiveJavaObject(o); + Text txt = new Text(); + txt.set(b, 0, b.length); + appendWithQuotes(sb, SerDeUtils.escapeString(txt.toString())); + break; + case DATE: + Date d = ((DateObjectInspector) poi).getPrimitiveJavaObject(o); + appendWithQuotes(sb, d.toString()); + break; + case TIMESTAMP: { + Timestamp t = ((TimestampObjectInspector) poi).getPrimitiveJavaObject(o); + appendWithQuotes(sb, t.toString()); + break; + } + case DECIMAL: + sb.append(((HiveDecimalObjectInspector) poi).getPrimitiveJavaObject(o)); + break; + case VARCHAR: { + String s = SerDeUtils.escapeString( + ((HiveVarcharObjectInspector) poi).getPrimitiveJavaObject(o).toString()); + appendWithQuotes(sb, s); + break; + } + case CHAR: { + //this should use HiveChar.getPaddedValue() but it's protected; currently (v0.13) + // HiveChar.toString() returns getPaddedValue() + String s = SerDeUtils.escapeString( + ((HiveCharObjectInspector) poi).getPrimitiveJavaObject(o).toString()); + appendWithQuotes(sb, s); + break; + } + default: + throw new RuntimeException("Unknown primitive type: " + poi.getPrimitiveCategory()); } - break; } - case LIST: { - ListObjectInspector loi = (ListObjectInspector) oi; - ObjectInspector listElementObjectInspector = loi + break; + } + case LIST: { + ListObjectInspector loi = (ListObjectInspector) oi; + ObjectInspector listElementObjectInspector = loi .getListElementObjectInspector(); - List<?> olist = loi.getList(o); - if (olist == null) { - sb.append("null"); - } else { - sb.append(SerDeUtils.LBRACKET); - for (int i = 0; i < olist.size(); i++) { - if (i > 0) { - sb.append(SerDeUtils.COMMA); - } - buildJSONString(sb, olist.get(i), listElementObjectInspector); + List<?> olist = loi.getList(o); + if (olist == null) { + sb.append("null"); + } else { + sb.append(SerDeUtils.LBRACKET); + for (int i = 0; i < olist.size(); i++) { + if (i > 0) { + sb.append(SerDeUtils.COMMA); } - sb.append(SerDeUtils.RBRACKET); + buildJSONString(sb, olist.get(i), listElementObjectInspector); } - break; + sb.append(SerDeUtils.RBRACKET); } - case MAP: { - MapObjectInspector moi = (MapObjectInspector) oi; - ObjectInspector mapKeyObjectInspector = moi.getMapKeyObjectInspector(); - ObjectInspector mapValueObjectInspector = moi + break; + } + case MAP: { + MapObjectInspector moi = (MapObjectInspector) oi; + ObjectInspector mapKeyObjectInspector = moi.getMapKeyObjectInspector(); + ObjectInspector mapValueObjectInspector = moi .getMapValueObjectInspector(); - Map<?, ?> omap = moi.getMap(o); - if (omap == null) { - sb.append("null"); - } else { - sb.append(SerDeUtils.LBRACE); - boolean first = true; - for (Object entry : omap.entrySet()) { - if (first) { - first = false; - } else { - sb.append(SerDeUtils.COMMA); - } - Map.Entry<?, ?> e = (Map.Entry<?, ?>) entry; - StringBuilder keyBuilder = new StringBuilder(); - buildJSONString(keyBuilder, e.getKey(), mapKeyObjectInspector); - String keyString = keyBuilder.toString().trim(); - if ((!keyString.isEmpty()) && (keyString.charAt(0) != SerDeUtils.QUOTE)) { - appendWithQuotes(sb, keyString); - } else { - sb.append(keyString); - } - sb.append(SerDeUtils.COLON); - buildJSONString(sb, e.getValue(), mapValueObjectInspector); + Map<?, ?> omap = moi.getMap(o); + if (omap == null) { + sb.append("null"); + } else { + sb.append(SerDeUtils.LBRACE); + boolean first = true; + for (Object entry : omap.entrySet()) { + if (first) { + first = false; + } else { + sb.append(SerDeUtils.COMMA); + } + Map.Entry<?, ?> e = (Map.Entry<?, ?>) entry; + StringBuilder keyBuilder = new StringBuilder(); + buildJSONString(keyBuilder, e.getKey(), mapKeyObjectInspector); + String keyString = keyBuilder.toString().trim(); + if ((!keyString.isEmpty()) && (keyString.charAt(0) != SerDeUtils.QUOTE)) { + appendWithQuotes(sb, keyString); + } else { + sb.append(keyString); } - sb.append(SerDeUtils.RBRACE); + sb.append(SerDeUtils.COLON); + buildJSONString(sb, e.getValue(), mapValueObjectInspector); } - break; + sb.append(SerDeUtils.RBRACE); } - case STRUCT: { - StructObjectInspector soi = (StructObjectInspector) oi; - List<? extends StructField> structFields = soi.getAllStructFieldRefs(); - if (o == null) { - sb.append("null"); - } else { - sb.append(SerDeUtils.LBRACE); - for (int i = 0; i < structFields.size(); i++) { - if (i > 0) { - sb.append(SerDeUtils.COMMA); - } - appendWithQuotes(sb, structFields.get(i).getFieldName()); - sb.append(SerDeUtils.COLON); - buildJSONString(sb, soi.getStructFieldData(o, structFields.get(i)), - structFields.get(i).getFieldObjectInspector()); + break; + } + case STRUCT: { + StructObjectInspector soi = (StructObjectInspector) oi; + List<? extends StructField> structFields = soi.getAllStructFieldRefs(); + if (o == null) { + sb.append("null"); + } else { + sb.append(SerDeUtils.LBRACE); + for (int i = 0; i < structFields.size(); i++) { + if (i > 0) { + sb.append(SerDeUtils.COMMA); } - sb.append(SerDeUtils.RBRACE); + appendWithQuotes(sb, structFields.get(i).getFieldName()); + sb.append(SerDeUtils.COLON); + buildJSONString(sb, soi.getStructFieldData(o, structFields.get(i)), + structFields.get(i).getFieldObjectInspector()); } - break; + sb.append(SerDeUtils.RBRACE); } - case UNION: { - UnionObjectInspector uoi = (UnionObjectInspector) oi; - if (o == null) { - sb.append("null"); - } else { - sb.append(SerDeUtils.LBRACE); - sb.append(uoi.getTag(o)); - sb.append(SerDeUtils.COLON); - buildJSONString(sb, uoi.getField(o), + break; + } + case UNION: { + UnionObjectInspector uoi = (UnionObjectInspector) oi; + if (o == null) { + sb.append("null"); + } else { + sb.append(SerDeUtils.LBRACE); + sb.append(uoi.getTag(o)); + sb.append(SerDeUtils.COLON); + buildJSONString(sb, uoi.getField(o), uoi.getObjectInspectors().get(uoi.getTag(o))); - sb.append(SerDeUtils.RBRACE); - } - break; + sb.append(SerDeUtils.RBRACE); } - default: - throw new RuntimeException("Unknown type in ObjectInspector!"); + break; + } + default: + throw new RuntimeException("Unknown type in ObjectInspector!"); } } /** - * Returns an object inspector for the specified schema that - * is capable of reading in the object representation of the JSON string + * Returns an object inspector for the specified schema that + * is capable of reading in the object representation of the JSON string */ @Override public ObjectInspector getObjectInspector() throws SerDeException { - return cachedObjectInspector; + return structReader.getObjectInspector(); } @Override @@ -663,4 +376,12 @@ public class JsonSerDe extends AbstractSerDe { return null; } + public StructTypeInfo getTypeInfo() { + return rowTypeInfo; + } + + public void setWriteablesUsage(boolean b) { + structReader.setWritablesUsage(b); + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/1105ef39/serde/src/java/org/apache/hadoop/hive/serde2/json/HiveJsonStructReader.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/json/HiveJsonStructReader.java b/serde/src/java/org/apache/hadoop/hive/serde2/json/HiveJsonStructReader.java new file mode 100644 index 0000000..ec4efad --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/json/HiveJsonStructReader.java @@ -0,0 +1,402 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.json; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.CharacterCodingException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.hive.common.type.Date; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Text; +import org.apache.hive.common.util.TimestampParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; + +public class HiveJsonStructReader { + + private static final Logger LOG = LoggerFactory.getLogger(HiveJsonStructReader.class); + + private ObjectInspector oi; + private JsonFactory factory; + + + Set<String> reportedUnknownFieldNames = new HashSet<>(); + + private static boolean ignoreUnknownFields; + private static boolean hiveColIndexParsing; + private boolean writeablePrimitives; + + private TimestampParser tsParser; + + public HiveJsonStructReader(TypeInfo t) { + this(t, new TimestampParser()); + } + + public HiveJsonStructReader(TypeInfo t, TimestampParser tsParser) { + this.tsParser = tsParser; + oi = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(t); + factory = new JsonFactory(); + } + + public Object parseStruct(String text) throws JsonParseException, IOException, SerDeException { + JsonParser parser = factory.createParser(text); + return parseInternal(parser); + } + + public Object parseStruct(InputStream is) throws JsonParseException, IOException, SerDeException { + JsonParser parser = factory.createParser(is); + return parseInternal(parser); + } + + private Object parseInternal(JsonParser parser) throws SerDeException { + try { + parser.nextToken(); + Object res = parseDispatcher(parser, oi); + return res; + } catch (Exception e) { + String locationStr = parser.getCurrentLocation().getLineNr() + "," + parser.getCurrentLocation().getColumnNr(); + throw new SerDeException("at[" + locationStr + "]: " + e.getMessage(), e); + } + } + + private Object parseDispatcher(JsonParser parser, ObjectInspector oi) + throws JsonParseException, IOException, SerDeException { + + switch (oi.getCategory()) { + case PRIMITIVE: + return parsePrimitive(parser, (PrimitiveObjectInspector) oi); + case LIST: + return parseList(parser, (ListObjectInspector) oi); + case STRUCT: + return parseStruct(parser, (StructObjectInspector) oi); + case MAP: + return parseMap(parser, (MapObjectInspector) oi); + default: + throw new SerDeException("parsing of: " + oi.getCategory() + " is not handled"); + } + } + + private Object parseMap(JsonParser parser, MapObjectInspector oi) throws IOException, SerDeException { + + if (parser.getCurrentToken() == JsonToken.VALUE_NULL) { + parser.nextToken(); + return null; + } + + Map<Object, Object> ret = new LinkedHashMap<>(); + + if (parser.getCurrentToken() != JsonToken.START_OBJECT) { + throw new SerDeException("struct expected"); + } + + if (!(oi.getMapKeyObjectInspector() instanceof PrimitiveObjectInspector)) { + throw new SerDeException("map key must be a primitive"); + } + PrimitiveObjectInspector keyOI = (PrimitiveObjectInspector) oi.getMapKeyObjectInspector(); + ObjectInspector valOI = oi.getMapValueObjectInspector(); + + JsonToken currentToken = parser.nextToken(); + while (currentToken != null && currentToken != JsonToken.END_OBJECT) { + + if (currentToken != JsonToken.FIELD_NAME) { + throw new SerDeException("unexpected token: " + currentToken); + } + + Object key = parseMapKey(parser, keyOI); + Object val = parseDispatcher(parser, valOI); + ret.put(key, val); + + currentToken = parser.getCurrentToken(); + } + if (currentToken != null) { + parser.nextToken(); + } + return ret; + + } + + private Object parseStruct(JsonParser parser, StructObjectInspector oi) + throws JsonParseException, IOException, SerDeException { + + Object[] ret = new Object[oi.getAllStructFieldRefs().size()]; + + if (parser.getCurrentToken() == JsonToken.VALUE_NULL) { + parser.nextToken(); + return null; + } + if (parser.getCurrentToken() != JsonToken.START_OBJECT) { + throw new SerDeException("struct expected"); + } + JsonToken currentToken = parser.nextToken(); + while (currentToken != null && currentToken != JsonToken.END_OBJECT) { + + switch (currentToken) { + case FIELD_NAME: + String name = parser.getCurrentName(); + try { + StructField field = null; + try { + field = getStructField(oi, name); + } catch (RuntimeException e) { + if (ignoreUnknownFields) { + if (!reportedUnknownFieldNames.contains(name)) { + LOG.warn("ignoring field:" + name); + reportedUnknownFieldNames.add(name); + } + parser.nextToken(); + skipValue(parser); + break; + } + } + if (field == null) { + throw new SerDeException("undeclared field"); + } + parser.nextToken(); + ret[field.getFieldID()] = parseDispatcher(parser, field.getFieldObjectInspector()); + } catch (Exception e) { + throw new SerDeException("struct field " + name + ": " + e.getMessage(), e); + } + break; + default: + throw new SerDeException("unexpected token: " + currentToken); + } + currentToken = parser.getCurrentToken(); + } + if (currentToken != null) { + parser.nextToken(); + } + return ret; + } + + private StructField getStructField(StructObjectInspector oi, String name) { + if (hiveColIndexParsing) { + int colIndex = getColIndex(name); + if (colIndex >= 0) { + return oi.getAllStructFieldRefs().get(colIndex); + } + } + // FIXME: linear scan inside the below method...get a map here or something.. + return oi.getStructFieldRef(name); + } + + Pattern internalPattern = Pattern.compile("^_col([0-9]+)$"); + + private int getColIndex(String internalName) { + // The above line should have been all the implementation that + // we need, but due to a bug in that impl which recognizes + // only single-digit columns, we need another impl here. + Matcher m = internalPattern.matcher(internalName); + if (!m.matches()) { + return -1; + } else { + return Integer.parseInt(m.group(1)); + } + } + + private static void skipValue(JsonParser parser) throws JsonParseException, IOException { + + int array = 0; + int object = 0; + do { + JsonToken currentToken = parser.getCurrentToken(); + if(currentToken == JsonToken.START_ARRAY) { + array++; + } + if (currentToken == JsonToken.END_ARRAY) { + array--; + } + if (currentToken == JsonToken.START_OBJECT) { + object++; + } + if (currentToken == JsonToken.END_OBJECT) { + object--; + } + + parser.nextToken(); + + } while (array > 0 || object > 0); + + } + + private Object parseList(JsonParser parser, ListObjectInspector oi) + throws JsonParseException, IOException, SerDeException { + List<Object> ret = new ArrayList<>(); + + if (parser.getCurrentToken() == JsonToken.VALUE_NULL) { + parser.nextToken(); + return null; + } + + if (parser.getCurrentToken() != JsonToken.START_ARRAY) { + throw new SerDeException("array expected"); + } + ObjectInspector eOI = oi.getListElementObjectInspector(); + JsonToken currentToken = parser.nextToken(); + try { + while (currentToken != null && currentToken != JsonToken.END_ARRAY) { + ret.add(parseDispatcher(parser, eOI)); + currentToken = parser.getCurrentToken(); + } + } catch (Exception e) { + throw new SerDeException("array: " + e.getMessage(), e); + } + + currentToken = parser.nextToken(); + + return ret; + } + + private Object parsePrimitive(JsonParser parser, PrimitiveObjectInspector oi) + throws SerDeException, IOException { + JsonToken currentToken = parser.getCurrentToken(); + if (currentToken == null) { + return null; + } + try { + switch (parser.getCurrentToken()) { + case VALUE_FALSE: + case VALUE_TRUE: + case VALUE_NUMBER_INT: + case VALUE_NUMBER_FLOAT: + case VALUE_STRING: + return getObjectOfCorrespondingPrimitiveType(parser.getValueAsString(), oi); + case VALUE_NULL: + return null; + default: + throw new SerDeException("unexpected token type: " + currentToken); + } + } finally { + parser.nextToken(); + + } + } + + private Object getObjectOfCorrespondingPrimitiveType(String s, PrimitiveObjectInspector oi) + throws IOException { + PrimitiveTypeInfo typeInfo = oi.getTypeInfo(); + if (writeablePrimitives) { + Converter c = ObjectInspectorConverters.getConverter(PrimitiveObjectInspectorFactory.javaStringObjectInspector, oi); + return c.convert(s); + } + + switch (typeInfo.getPrimitiveCategory()) { + case INT: + return Integer.valueOf(s); + case BYTE: + return Byte.valueOf(s); + case SHORT: + return Short.valueOf(s); + case LONG: + return Long.valueOf(s); + case BOOLEAN: + return (s.equalsIgnoreCase("true")); + case FLOAT: + return Float.valueOf(s); + case DOUBLE: + return Double.valueOf(s); + case STRING: + return s; + case BINARY: + try { + String t = Text.decode(s.getBytes(), 0, s.getBytes().length); + return t.getBytes(); + } catch (CharacterCodingException e) { + LOG.warn("Error generating json binary type from object.", e); + return null; + } + case DATE: + return Date.valueOf(s); + case TIMESTAMP: + return tsParser.parseTimestamp(s); + case DECIMAL: + return HiveDecimal.create(s); + case VARCHAR: + return new HiveVarchar(s, ((BaseCharTypeInfo) typeInfo).getLength()); + case CHAR: + return new HiveChar(s, ((BaseCharTypeInfo) typeInfo).getLength()); + } + throw new IOException("Could not convert from string to map type " + typeInfo.getTypeName()); + } + + private Object parseMapKey(JsonParser parser, PrimitiveObjectInspector oi) throws SerDeException, IOException { + JsonToken currentToken = parser.getCurrentToken(); + if (currentToken == null) { + return null; + } + try { + switch (parser.getCurrentToken()) { + case FIELD_NAME: + return getObjectOfCorrespondingPrimitiveType(parser.getValueAsString(), oi); + case VALUE_NULL: + return null; + default: + throw new SerDeException("unexpected token type: " + currentToken); + } + } finally { + parser.nextToken(); + + } + } + + public void setIgnoreUnknownFields(boolean b) { + ignoreUnknownFields = b; + } + + public void enableHiveColIndexParsing(boolean b) { + hiveColIndexParsing = b; + } + + public void setWritablesUsage(boolean b) { + writeablePrimitives = b; + } + + public ObjectInspector getObjectInspector() { + return oi; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/1105ef39/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java index cabb64c..416bd67 100644 --- a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java @@ -30,7 +30,7 @@ import com.google.common.base.Joiner; /** * Streaming Writer handles utf8 encoded Json (Strict syntax). - * Uses org.apache.hadoop.hive.serde2.JsonSerDe to process Json input + * Uses {@link JsonSerDe} to process Json input * * NOTE: This record writer is NOT thread-safe. Use one record writer per streaming connection. */