HAWQ-992. PXF Hive data type check in Fragmenter too restrictive.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/e2416f49 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/e2416f49 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/e2416f49 Branch: refs/heads/HAWQ-992 Commit: e2416f498ebb2be29712a1042c4e9bae99f523ff Parents: 24f5e36 Author: Oleksandr Diachenko <[email protected]> Authored: Fri Aug 26 16:04:53 2016 -0700 Committer: Oleksandr Diachenko <[email protected]> Committed: Fri Aug 26 16:04:53 2016 -0700 ---------------------------------------------------------------------- .../hawq/pxf/api/utilities/EnumHawqType.java | 28 +- .../plugins/hive/HiveColumnarSerdeResolver.java | 2 +- .../plugins/hive/HiveInputFormatFragmenter.java | 4 +- .../hive/utilities/EnumHiveToHawqType.java | 61 +- .../plugins/hive/utilities/HiveUtilities.java | 45 +- .../hive/utilities/HiveUtilitiesTest.java | 65 ++ .../pxf/service/utilities/ProtocolData.java | 5 +- .../org/apache/hawq/pxf/service/Bridge.java | 40 + .../hawq/pxf/service/BridgeInputBuilder.java | 71 ++ .../hawq/pxf/service/BridgeOutputBuilder.java | 394 ++++++++ .../hawq/pxf/service/FragmenterFactory.java | 37 + .../hawq/pxf/service/FragmentsResponse.java | 89 ++ .../pxf/service/FragmentsResponseFormatter.java | 157 ++++ .../hawq/pxf/service/GPDBWritableMapper.java | 135 +++ .../pxf/service/MetadataFetcherFactory.java | 36 + .../hawq/pxf/service/MetadataResponse.java | 93 ++ .../pxf/service/MetadataResponseFormatter.java | 95 ++ .../org/apache/hawq/pxf/service/ReadBridge.java | 179 ++++ .../hawq/pxf/service/ReadSamplingBridge.java | 131 +++ .../apache/hawq/pxf/service/WriteBridge.java | 117 +++ .../hawq/pxf/service/io/BufferWritable.java | 98 ++ .../hawq/pxf/service/io/GPDBWritable.java | 893 +++++++++++++++++++ .../org/apache/hawq/pxf/service/io/Text.java | 399 +++++++++ .../apache/hawq/pxf/service/io/Writable.java | 50 ++ .../apache/hawq/pxf/service/package-info.java | 23 + .../hawq/pxf/service/rest/BridgeResource.java | 189 ++++ .../pxf/service/rest/ClusterNodesResource.java | 148 +++ .../pxf/service/rest/FragmenterResource.java | 154 ++++ .../pxf/service/rest/InvalidPathResource.java | 179 ++++ .../hawq/pxf/service/rest/MetadataResource.java | 124 +++ .../hawq/pxf/service/rest/RestResource.java | 71 ++ .../service/rest/ServletLifecycleListener.java | 63 ++ .../hawq/pxf/service/rest/VersionResource.java | 88 ++ .../hawq/pxf/service/rest/WritableResource.java | 174 ++++ .../pxf/service/utilities/AnalyzeUtils.java | 147 +++ .../service/utilities/CustomWebappLoader.java | 231 +++++ .../pxf/service/utilities/Log4jConfigure.java | 66 ++ .../pxf/service/utilities/ProtocolData.java | 491 ++++++++++ .../hawq/pxf/service/utilities/SecureLogin.java | 61 ++ .../hawq/pxf/service/utilities/SecuredHDFS.java | 114 +++ 40 files changed, 5509 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java index 01d40f0..f35fa5e 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java @@ -56,23 +56,27 @@ public enum EnumHawqType { DateType("date", DataType.DATE), TimestampType("timestamp", DataType.TIMESTAMP), BoolType("bool", DataType.BOOLEAN), - NumericType("numeric", DataType.NUMERIC, (byte) 2, true), + NumericType("numeric", DataType.NUMERIC, (byte) 2, false), BpcharType("bpchar", DataType.BPCHAR, (byte) 1, true); private DataType dataType; private String typeName; private byte modifiersNum; - private boolean validateIntegerModifiers; + private boolean mandatoryModifiers; EnumHawqType(String typeName, DataType dataType) { this.typeName = typeName; this.dataType = dataType; } - EnumHawqType(String typeName, DataType dataType, byte modifiersNum, boolean validateIntegerModifiers) { + EnumHawqType(String typeName, DataType dataType, byte modifiersNum) { this(typeName, dataType); this.modifiersNum = modifiersNum; - this.validateIntegerModifiers = validateIntegerModifiers; + } + + EnumHawqType(String typeName, DataType dataType, byte modifiersNum, boolean mandatoryModifiers) { + this(typeName, dataType, modifiersNum); + this.setMandatoryModifiers(mandatoryModifiers); } /** @@ -93,19 +97,19 @@ public enum EnumHawqType { /** * - * @return whether modifiers should be integers - */ - public boolean getValidateIntegerModifiers() { - return this.validateIntegerModifiers; - } - - /** - * * @return data type */ public DataType getDataType() { return this.dataType; } + + public boolean isMandatoryModifiers() { + return mandatoryModifiers; + } + + public void setMandatoryModifiers(boolean mandatoryModifiers) { + this.mandatoryModifiers = mandatoryModifiers; + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java index 43e3b65..606ddc6 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java @@ -127,7 +127,7 @@ public class HiveColumnarSerdeResolver extends HiveResolver { for (int i = 0; i < numberOfDataColumns; i++) { ColumnDescriptor column = input.getColumn(i); String columnName = column.columnName(); - String columnType = HiveUtilities.toHiveType(DataType.get(column.columnTypeCode())); + String columnType = HiveUtilities.toCompatibleHiveType(DataType.get(column.columnTypeCode())); columnNames.append(delim).append(columnName); columnTypes.append(delim).append(columnType); delim = ","; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java index b944206..ccc8fa7 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java @@ -148,14 +148,14 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter { for (FieldSchema hiveCol : hiveColumns) { ColumnDescriptor colDesc = inputData.getColumn(index++); DataType colType = DataType.get(colDesc.columnTypeCode()); - HiveUtilities.compareTypes(colType, hiveCol.getType(), colDesc.columnName()); + HiveUtilities.validateTypeCompatible(colType, colDesc.columnTypeModifiers(), hiveCol.getType(), colDesc.columnName()); } // check partition fields List<FieldSchema> hivePartitions = tbl.getPartitionKeys(); for (FieldSchema hivePart : hivePartitions) { ColumnDescriptor colDesc = inputData.getColumn(index++); DataType colType = DataType.get(colDesc.columnTypeCode()); - HiveUtilities.compareTypes(colType, hivePart.getType(), colDesc.columnName()); + HiveUtilities.validateTypeCompatible(colType, colDesc.columnTypeModifiers(), hivePart.getType(), colDesc.columnName()); } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java index 1cedaa8..9b24642 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java @@ -19,6 +19,11 @@ package org.apache.hawq.pxf.plugins.hive.utilities; +import java.util.Arrays; +import java.util.Comparator; +import java.util.SortedSet; +import java.util.TreeSet; + import org.apache.hawq.pxf.api.io.DataType; import org.apache.hawq.pxf.api.utilities.EnumHawqType; import org.apache.hawq.pxf.api.UnsupportedTypeException; @@ -30,8 +35,8 @@ import org.apache.hawq.pxf.api.UnsupportedTypeException; */ public enum EnumHiveToHawqType { - TinyintType("tinyint", EnumHawqType.Int2Type), - SmallintType("smallint", EnumHawqType.Int2Type), + TinyintType("tinyint", EnumHawqType.Int2Type, (byte) 1), + SmallintType("smallint", EnumHawqType.Int2Type, (byte) 2), IntType("int", EnumHawqType.Int4Type), BigintType("bigint", EnumHawqType.Int8Type), BooleanType("boolean", EnumHawqType.BoolType), @@ -52,11 +57,17 @@ public enum EnumHiveToHawqType { private String typeName; private EnumHawqType hawqType; private String splitExpression; + private byte size; EnumHiveToHawqType(String typeName, EnumHawqType hawqType) { this.typeName = typeName; this.hawqType = hawqType; } + + EnumHiveToHawqType(String typeName, EnumHawqType hawqType, byte size) { + this(typeName, hawqType); + this.setSize(size); + } EnumHiveToHawqType(String typeName, EnumHawqType hawqType, String splitExpression) { this(typeName, hawqType); @@ -111,15 +122,51 @@ public enum EnumHiveToHawqType { + hiveType + " to HAWQ's type"); } - public static EnumHiveToHawqType getHawqToHiveType(DataType dataType) { + public static EnumHiveToHawqType getCompatibleHawqToHiveType(DataType dataType) { - for (EnumHiveToHawqType t : values()) { + SortedSet<EnumHiveToHawqType> types = new TreeSet<EnumHiveToHawqType>(new Comparator<EnumHiveToHawqType>() { + public int compare(EnumHiveToHawqType a, EnumHiveToHawqType b){ + return Byte.compare(a.getSize(), b.getSize()); + } + }); + + for (EnumHiveToHawqType t : values()) { if (t.getHawqType().getDataType().equals(dataType)) { - return t; + types.add(t); + } + } + + if (types.size() == 0) + throw new UnsupportedTypeException("Unable to map HAWQ's type: " + + dataType + " to Hive's type"); + + return types.last(); + } + + public static String[] extractModifiers(String hiveType) { + String[] result = null; + for (EnumHiveToHawqType t : values()) { + String hiveTypeName = hiveType; + String splitExpression = t.getSplitExpression(); + if (splitExpression != null) { + String[] tokens = hiveType.split(splitExpression); + hiveTypeName = tokens[0]; + result = Arrays.copyOfRange(tokens, 1, tokens.length); + } + if (t.getTypeName().toLowerCase().equals(hiveTypeName.toLowerCase())) { + return result; } } - throw new UnsupportedTypeException("Unable to map HAWQ's type: " - + dataType + " to Hive's type"); + throw new UnsupportedTypeException("Unable to map Hive's type: " + + hiveType + " to HAWQ's type"); + } + + public byte getSize() { + return size; + } + + public void setSize(byte size) { + this.size = size; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java index 579ab0b..6bda9b7 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java @@ -139,7 +139,7 @@ public class HiveUtilities { + ", actual number of modifiers: " + modifiers.length); } - if (hawqType.getValidateIntegerModifiers() && !verifyIntegerModifiers(modifiers)) { + if (!verifyIntegerModifiers(modifiers)) { throw new UnsupportedTypeException("HAWQ does not support type " + hiveType + " (Field " + fieldName + "), modifiers should be integers"); } } @@ -279,24 +279,41 @@ public class HiveUtilities { * @return Hive type * @throws UnsupportedTypeException if type is not supported */ - public static String toHiveType(DataType type) { + public static String toCompatibleHiveType(DataType type) { - EnumHiveToHawqType hiveToHawqType = EnumHiveToHawqType.getHawqToHiveType(type); + EnumHiveToHawqType hiveToHawqType = EnumHiveToHawqType.getCompatibleHawqToHiveType(type); return hiveToHawqType.getTypeName(); } - public static void compareTypes(DataType type, String hiveType, String columnName) { - String convertedHive = toHiveType(type); - if (!convertedHive.equals(hiveType) - && !(convertedHive.equals("smallint") && hiveType.equals("tinyint"))) { - throw new UnsupportedTypeException( - "Schema mismatch definition:" - + " (Hive type " + hiveType + ", HAWQ type " - + type.toString() + ")"); + + + public static void validateTypeCompatible(DataType hawqDataType, String[] hawqTypeMods, String hiveType, String hawqColumnName) { + + EnumHiveToHawqType hiveToHawqType = EnumHiveToHawqType.getHiveToHawqType(hiveType); + EnumHawqType expectedHawqType = hiveToHawqType.getHawqType(); + + if ((hawqTypeMods == null || hawqTypeMods.length == 0) && expectedHawqType.isMandatoryModifiers()) + throw new UnsupportedTypeException("Invalid definition for column " + hawqColumnName + ": modifiers are mandatory for type " + expectedHawqType.getTypeName()); + + switch (hawqDataType) { + case NUMERIC: + String[] hiveTypeModifiers = EnumHiveToHawqType.extractModifiers(hiveType); + for (int i = 0; hawqTypeMods != null && i < hawqTypeMods.length; i++) { + if (Integer.valueOf(hawqTypeMods[i]) < Integer + .valueOf(hiveTypeModifiers[i])) + throw new UnsupportedTypeException( + "Invalid definition for column " + hawqColumnName + + ": modifiers are not compatible, " + + Arrays.toString(hiveTypeModifiers) + ", " + + Arrays.toString(hawqTypeMods)); + } + break; } - if (LOG.isDebugEnabled()) { - LOG.debug(" Hive type " + hiveType - + ", HAWQ type " + type.toString()); + + if (!hiveToHawqType.getHawqType().equals(expectedHawqType)) { + throw new UnsupportedTypeException("Invalid definition for column " + hawqColumnName + + ": expected HAWQ type " + expectedHawqType.getTypeName() + + ", actual HAWQ type " + hiveToHawqType.getHawqType().getTypeName() + ")"); } } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java index e9b024a..e94351a 100644 --- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java +++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java @@ -22,11 +22,16 @@ package org.apache.hawq.pxf.plugins.hive.utilities; import static org.junit.Assert.*; +import java.util.Arrays; + +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.utilities.EnumHawqType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.junit.Test; import org.apache.hawq.pxf.api.Metadata; import org.apache.hawq.pxf.api.UnsupportedTypeException; import org.apache.hawq.pxf.plugins.hive.utilities.EnumHiveToHawqType; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; public class HiveUtilitiesTest { @@ -121,6 +126,66 @@ public class HiveUtilitiesTest { } @Test + public void testCompatibleHiveType() { + String compatibleTypeName = HiveUtilities.toCompatibleHiveType(DataType.SMALLINT); + assertEquals(compatibleTypeName, EnumHiveToHawqType.SmallintType.getTypeName()); + } + + @Test + public void validateSchema() throws Exception { + String columnName = "abc"; + + String[] hawqModifiers = {}; + HiveUtilities.validateTypeCompatible(DataType.SMALLINT, hawqModifiers, EnumHiveToHawqType.TinyintType.getTypeName(), columnName); + + HiveUtilities.validateTypeCompatible(DataType.SMALLINT, hawqModifiers, EnumHiveToHawqType.SmallintType.getTypeName(), columnName); + + //Both Hive and HAWQ types have the same modifiers + hawqModifiers = new String[]{"38", "18"}; + HiveUtilities.validateTypeCompatible(DataType.NUMERIC, hawqModifiers, "decimal(38,18)", columnName); + + //HAWQ datatype doesn't require modifiers, they are empty, Hive has non-empty modifiers + //Types are compatible in this case + hawqModifiers = new String[]{}; + HiveUtilities.validateTypeCompatible(DataType.NUMERIC, hawqModifiers, "decimal(38,18)", columnName); + hawqModifiers = null; + HiveUtilities.validateTypeCompatible(DataType.NUMERIC, hawqModifiers, "decimal(38,18)", columnName); + + //HAWQ datatype requires modifiers but they aren't provided + //Types aren't compatible + try { + hawqModifiers = new String[]{}; + HiveUtilities.validateTypeCompatible(DataType.VARCHAR, hawqModifiers, "varchar", columnName); + fail("should fail with incompatible modifiers message"); + } + catch (UnsupportedTypeException e) { + String errorMsg = "Invalid definition for column " + columnName + ": modifiers are mandatory for type " + EnumHawqType.VarcharType.getTypeName(); + assertEquals(errorMsg, e.getMessage()); + } + + + //HAWQ has lesser modifiers than Hive, types aren't compatible + try { + hawqModifiers = new String[]{"38", "17"}; + HiveUtilities.validateTypeCompatible(DataType.NUMERIC, hawqModifiers, "decimal(38,18)", columnName); + fail("should fail with incompatible modifiers message"); + } + catch (UnsupportedTypeException e) { + String errorMsg = "Invalid definition for column " + columnName + + ": modifiers are not compatible, " + + Arrays.toString(new String[]{"38", "18"}) + ", " + + Arrays.toString(new String[]{"38", "17"}); + assertEquals(errorMsg, e.getMessage()); + } + } + + @Test + public void extractModifiers() throws Exception { + String[] mods = EnumHiveToHawqType.extractModifiers("decimal(10,2)"); + assertEquals(mods, new String[]{"10", "2"}); + } + + @Test public void mapHiveTypeWithModifiersNegative() throws Exception { String badHiveType = "decimal(2)"; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java index 5e6f6c4..2838232 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java @@ -402,9 +402,10 @@ public class ProtocolData extends InputData { } private String[] parseTypeMods(int columnIndex) { - Integer typeModeCount = Integer.parseInt(getOptionalProperty("ATTR-TYPEMOD" + columnIndex + "COUNT")); + String typeModeCountStr = getOptionalProperty("ATTR-TYPEMOD" + columnIndex + "COUNT"); String[] result = null; - if (typeModeCount > 0) { + if (typeModeCountStr != null) { + Integer typeModeCount = Integer.parseInt(typeModeCountStr); result = new String[typeModeCount]; for (int i = 0; i < typeModeCount; i++) { result[i] = getProperty("ATTR-TYPEMOD" + columnIndex + "-" + i); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java new file mode 100644 index 0000000..bfd862a --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java @@ -0,0 +1,40 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.hawq.pxf.service.io.Writable; + +import java.io.DataInputStream; + +/** + * Bridge interface - defines the interface of the Bridge classes. Any Bridge + * class acts as an iterator over Hadoop stored data, and should implement + * getNext (for reading) or setNext (for writing) for handling accessed data. + */ +public interface Bridge { + boolean beginIteration() throws Exception; + + Writable getNext() throws Exception; + + boolean setNext(DataInputStream inputStream) throws Exception; + + boolean isThreadSafe(); +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java new file mode 100644 index 0000000..4b4d2e8 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java @@ -0,0 +1,71 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OutputFormat; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.service.io.GPDBWritable; +import org.apache.hawq.pxf.service.io.Text; +import org.apache.hawq.pxf.service.utilities.ProtocolData; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.DataInput; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +public class BridgeInputBuilder { + private ProtocolData protocolData; + private static final Log LOG = LogFactory.getLog(BridgeInputBuilder.class); + + public BridgeInputBuilder(ProtocolData protocolData) throws Exception { + this.protocolData = protocolData; + } + + public List<OneField> makeInput(DataInput inputStream) throws Exception { + if (protocolData.outputFormat() == OutputFormat.TEXT) { + Text txt = new Text(); + txt.readFields(inputStream); + return Collections.singletonList(new OneField(DataType.BYTEA.getOID(), txt.getBytes())); + } + + GPDBWritable gpdbWritable = new GPDBWritable(); + gpdbWritable.readFields(inputStream); + + if (gpdbWritable.isEmpty()) { + LOG.debug("Reached end of stream"); + return null; + } + + GPDBWritableMapper mapper = new GPDBWritableMapper(gpdbWritable); + int[] colTypes = gpdbWritable.getColType(); + List<OneField> record = new LinkedList<OneField>(); + for (int i = 0; i < colTypes.length; i++) { + mapper.setDataType(colTypes[i]); + record.add(new OneField(colTypes[i], mapper.getData(i))); + } + + return record; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java new file mode 100644 index 0000000..c59fbea --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java @@ -0,0 +1,394 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.BadRecordException; +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OutputFormat; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.service.io.BufferWritable; +import org.apache.hawq.pxf.service.io.GPDBWritable; +import org.apache.hawq.pxf.service.io.GPDBWritable.TypeMismatchException; +import org.apache.hawq.pxf.service.io.Text; +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.ProtocolData; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.ObjectUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.lang.reflect.Array; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import static org.apache.hawq.pxf.api.io.DataType.TEXT; + +/** + * Class creates the output record that is piped by the java process to the HAWQ + * backend. Actually, the output record is serialized and the obtained byte + * string is piped to the HAWQ segment. The output record will implement + * Writable, and the mission of BridgeOutputBuilder will be to translate a list + * of {@link OneField} objects (obtained from the Resolver) into an output + * record. + */ +public class BridgeOutputBuilder { + private ProtocolData inputData; + private Writable output = null; + private LinkedList<Writable> outputList = null; + private Writable partialLine = null; + private GPDBWritable errorRecord = null; + private int[] schema; + private String[] colNames; + private boolean samplingEnabled = false; + private boolean isPartialLine = false; + + private static final byte DELIM = 10; /* (byte)'\n'; */ + + private static final Log LOG = LogFactory.getLog(BridgeOutputBuilder.class); + + /** + * Constructs a BridgeOutputBuilder. + * + * @param input input data, like requested output format and schema + * information + */ + public BridgeOutputBuilder(ProtocolData input) { + inputData = input; + outputList = new LinkedList<Writable>(); + makeErrorRecord(); + samplingEnabled = (inputData.getStatsSampleRatio() > 0); + } + + /** + * We need a separate GPDBWritable record to represent the error record. + * Just setting the errorFlag on the "output" GPDBWritable variable is not + * good enough, since the GPDBWritable is built only after the first record + * is read from the file. And if we encounter an error while fetching the + * first record from the file, then the output member will be null. The + * reason we cannot count on the schema to build the GPDBWritable output + * variable before reading the first record, is because the schema does not + * account for arrays - we cannot know from the schema the length of an + * array. We find out only after fetching the first record. + */ + void makeErrorRecord() { + int[] errSchema = { TEXT.getOID() }; + + if (inputData.outputFormat() != OutputFormat.BINARY) { + return; + } + + errorRecord = new GPDBWritable(errSchema); + errorRecord.setError(true); + } + + /** + * Returns the error record. If the output format is not binary, error + * records are not supported, and the given exception will be thrown + * + * @param ex exception to be stored in record + * @return error record + * @throws Exception if the output format is not binary + */ + public Writable getErrorOutput(Exception ex) throws Exception { + if (inputData.outputFormat() == OutputFormat.BINARY) { + errorRecord.setString(0, ex.getMessage()); + return errorRecord; + } else { + throw ex; + } + } + + /** + * Translates recFields (obtained from the Resolver) into an output record. + * + * @param recFields record fields to be serialized + * @return list of Writable objects with serialized row + * @throws BadRecordException if building the output record failed + */ + public LinkedList<Writable> makeOutput(List<OneField> recFields) + throws BadRecordException { + if (output == null && inputData.outputFormat() == OutputFormat.BINARY) { + makeGPDBWritableOutput(); + } + + outputList.clear(); + + fillOutputRecord(recFields); + + return outputList; + } + + /** + * Returns whether or not this is a partial line. + * + * @return true for a partial line + */ + public Writable getPartialLine() { + return partialLine; + } + + /** + * Creates the GPDBWritable object. The object is created one time and is + * refilled from recFields for each record sent + * + * @return empty GPDBWritable object with set columns + */ + GPDBWritable makeGPDBWritableOutput() { + int num_actual_fields = inputData.getColumns(); + schema = new int[num_actual_fields]; + colNames = new String[num_actual_fields]; + + for (int i = 0; i < num_actual_fields; i++) { + schema[i] = inputData.getColumn(i).columnTypeCode(); + colNames[i] = inputData.getColumn(i).columnName(); + } + + output = new GPDBWritable(schema); + + return (GPDBWritable) output; + } + + /** + * Fills the output record based on the fields in recFields. + * + * @param recFields record fields + * @throws BadRecordException if building the output record failed + */ + void fillOutputRecord(List<OneField> recFields) throws BadRecordException { + if (inputData.outputFormat() == OutputFormat.BINARY) { + fillGPDBWritable(recFields); + } else { + fillText(recFields); + } + } + + /** + * Fills a GPDBWritable object based on recFields. The input record + * recFields must correspond to schema. If the record has more or less + * fields than the schema we throw an exception. We require that the type of + * field[i] in recFields corresponds to the type of field[i] in the schema. + * + * @param recFields record fields + * @throws BadRecordException if building the output record failed + */ + void fillGPDBWritable(List<OneField> recFields) throws BadRecordException { + int size = recFields.size(); + if (size == 0) { // size 0 means the resolver couldn't deserialize any + // of the record fields + throw new BadRecordException("No fields in record"); + } else if (size != schema.length) { + throw new BadRecordException("Record has " + size + + " fields but the schema size is " + schema.length); + } + + for (int i = 0; i < size; i++) { + OneField current = recFields.get(i); + if (!isTypeInSchema(current.type, schema[i])) { + throw new BadRecordException("For field " + colNames[i] + + " schema requires type " + + DataType.get(schema[i]).toString() + + " but input record has type " + + DataType.get(current.type).toString()); + } + + fillOneGPDBWritableField(current, i); + } + + outputList.add(output); + } + + /** + * Tests if data type is a string type. String type is a type that can be + * serialized as string, such as varchar, bpchar, text, numeric, timestamp, + * date. + * + * @param type data type + * @return whether data type is string type + */ + boolean isStringType(DataType type) { + return Arrays.asList(DataType.VARCHAR, DataType.BPCHAR, DataType.TEXT, + DataType.NUMERIC, DataType.TIMESTAMP, DataType.DATE).contains( + type); + } + + /** + * Tests if record field type and schema type correspond. + * + * @param recType record type code + * @param schemaType schema type code + * @return whether record type and schema type match + */ + boolean isTypeInSchema(int recType, int schemaType) { + DataType dtRec = DataType.get(recType); + DataType dtSchema = DataType.get(schemaType); + + return (dtSchema == DataType.UNSUPPORTED_TYPE || dtRec == dtSchema || (isStringType(dtRec) && isStringType(dtSchema))); + } + + /** + * Fills a Text object based on recFields. + * + * @param recFields record fields + * @throws BadRecordException if text formatted record has more than one + * field + */ + void fillText(List<OneField> recFields) throws BadRecordException { + /* + * For the TEXT case there must be only one record in the list + */ + if (recFields.size() != 1) { + throw new BadRecordException( + "BridgeOutputBuilder must receive one field when handling the TEXT format"); + } + + OneField fld = recFields.get(0); + int type = fld.type; + Object val = fld.val; + if (DataType.get(type) == DataType.BYTEA) {// from LineBreakAccessor + if (samplingEnabled) { + convertTextDataToLines((byte[]) val); + } else { + output = new BufferWritable((byte[]) val); + outputList.add(output); // TODO break output into lines + } + } else { // from QuotedLineBreakAccessor + String textRec = (String) val; + output = new Text(textRec + "\n"); + outputList.add(output); + } + } + + /** + * Breaks raw bytes into lines. Used only for sampling. + * + * When sampling a data source, we have to make sure that we deal with + * actual rows (lines) and not bigger chunks of data such as used by + * LineBreakAccessor for performance. The input byte array is broken into + * lines, each one stored in the outputList. In case the read data doesn't + * end with a line delimiter, which can happen when reading chunks of bytes, + * the partial line is stored separately, and is being completed when + * reading the next chunk of data. + * + * @param val input raw data to break into lines + */ + void convertTextDataToLines(byte[] val) { + int len = val.length; + int start = 0; + int end = 0; + byte[] line; + BufferWritable writable; + + while (start < len) { + end = ArrayUtils.indexOf(val, DELIM, start); + if (end == ArrayUtils.INDEX_NOT_FOUND) { + // data finished in the middle of the line + end = len; + isPartialLine = true; + } else { + end++; // include the DELIM character + isPartialLine = false; + } + line = Arrays.copyOfRange(val, start, end); + + if (partialLine != null) { + // partial data was completed + ((BufferWritable) partialLine).append(line); + writable = (BufferWritable) partialLine; + partialLine = null; + } else { + writable = new BufferWritable(line); + } + + if (isPartialLine) { + partialLine = writable; + } else { + outputList.add(writable); + } + start = end; + } + } + + /** + * Fills one GPDBWritable field. + * + * @param oneField field + * @param colIdx column index + * @throws BadRecordException if field type is not supported or doesn't + * match the schema + */ + void fillOneGPDBWritableField(OneField oneField, int colIdx) + throws BadRecordException { + int type = oneField.type; + Object val = oneField.val; + GPDBWritable gpdbOutput = (GPDBWritable) output; + try { + switch (DataType.get(type)) { + case INTEGER: + gpdbOutput.setInt(colIdx, (Integer) val); + break; + case FLOAT8: + gpdbOutput.setDouble(colIdx, (Double) val); + break; + case REAL: + gpdbOutput.setFloat(colIdx, (Float) val); + break; + case BIGINT: + gpdbOutput.setLong(colIdx, (Long) val); + break; + case SMALLINT: + gpdbOutput.setShort(colIdx, (Short) val); + break; + case BOOLEAN: + gpdbOutput.setBoolean(colIdx, (Boolean) val); + break; + case BYTEA: + byte[] bts = null; + if (val != null) { + int length = Array.getLength(val); + bts = new byte[length]; + for (int j = 0; j < length; j++) { + bts[j] = Array.getByte(val, j); + } + } + gpdbOutput.setBytes(colIdx, bts); + break; + case VARCHAR: + case BPCHAR: + case CHAR: + case TEXT: + case NUMERIC: + case TIMESTAMP: + case DATE: + gpdbOutput.setString(colIdx, + ObjectUtils.toString(val, null)); + break; + default: + String valClassName = (val != null) ? val.getClass().getSimpleName() + : null; + throw new UnsupportedOperationException(valClassName + + " is not supported for HAWQ conversion"); + } + } catch (TypeMismatchException e) { + throw new BadRecordException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java new file mode 100644 index 0000000..c516d69 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java @@ -0,0 +1,37 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.hawq.pxf.api.Fragmenter; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Utilities; + +/** + * Factory class for creation of {@link Fragmenter} objects. The actual {@link Fragmenter} object is "hidden" behind + * an {@link Fragmenter} abstract class which is returned by the FragmenterFactory. + */ +public class FragmenterFactory { + static public Fragmenter create(InputData inputData) throws Exception { + String fragmenterName = inputData.getFragmenter(); + + return (Fragmenter) Utilities.createAnyInstance(InputData.class, fragmenterName, inputData); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java new file mode 100644 index 0000000..d6efcae --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java @@ -0,0 +1,89 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.StreamingOutput; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.codehaus.jackson.map.ObjectMapper; + +import org.apache.hawq.pxf.api.Fragment; + +/** + * Class for serializing fragments metadata in JSON format. The class implements + * {@link StreamingOutput} so the serialization will be done in a stream and not + * in one bulk, this in order to avoid running out of memory when processing a + * lot of fragments. + */ +public class FragmentsResponse implements StreamingOutput { + + private static final Log Log = LogFactory.getLog(FragmentsResponse.class); + + private List<Fragment> fragments; + + /** + * Constructs fragments response out of a list of fragments + * + * @param fragments fragment list + */ + public FragmentsResponse(List<Fragment> fragments) { + this.fragments = fragments; + } + + /** + * Serializes a fragments list in JSON, To be used as the result string for + * HAWQ. An example result is as follows: + * <code>{"PXFFragments":[{"replicas": + * ["sdw1.corp.emc.com","sdw3.corp.emc.com","sdw8.corp.emc.com"], + * "sourceName":"text2.csv", "index":"0","metadata":"<base64 metadata for fragment>", + * "userData":"<data_specific_to_third_party_fragmenter>" + * },{"replicas":["sdw2.corp.emc.com","sdw4.corp.emc.com","sdw5.corp.emc.com" + * ],"sourceName":"text_data.csv","index":"0","metadata": + * "<base64 metadata for fragment>" + * ,"userData":"<data_specific_to_third_party_fragmenter>" + * }]}</code> + */ + @Override + public void write(OutputStream output) throws IOException, + WebApplicationException { + DataOutputStream dos = new DataOutputStream(output); + ObjectMapper mapper = new ObjectMapper(); + + dos.write("{\"PXFFragments\":[".getBytes()); + + String prefix = ""; + for (Fragment fragment : fragments) { + StringBuilder result = new StringBuilder(); + /* metaData and userData are automatically converted to Base64 */ + result.append(prefix).append(mapper.writeValueAsString(fragment)); + prefix = ","; + dos.write(result.toString().getBytes()); + } + + dos.write("]}".getBytes()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java new file mode 100644 index 0000000..14e87f9 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java @@ -0,0 +1,157 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.Fragment; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.List; + +/** + * Utility class for converting Fragments into a {@link FragmentsResponse} that + * will serialize them into JSON format. + */ +public class FragmentsResponseFormatter { + + private static final Log LOG = LogFactory.getLog(FragmentsResponseFormatter.class); + + /** + * Converts Fragments list to FragmentsResponse after replacing host name by + * their respective IPs. + * + * @param fragments list of fragments + * @param data data (e.g. path) related to the fragments + * @return FragmentsResponse with given fragments + * @throws UnknownHostException if converting host names to IP fails + */ + public static FragmentsResponse formatResponse(List<Fragment> fragments, + String data) + throws UnknownHostException { + /* print the raw fragment list to log when in debug level */ + if (LOG.isDebugEnabled()) { + LOG.debug("Fragments before conversion to IP list:"); + FragmentsResponseFormatter.printList(fragments, data); + } + + /* HD-2550: convert host names to IPs */ + convertHostsToIPs(fragments); + + updateFragmentIndex(fragments); + + /* print the fragment list to log when in debug level */ + if (LOG.isDebugEnabled()) { + FragmentsResponseFormatter.printList(fragments, data); + } + + return new FragmentsResponse(fragments); + } + + /** + * Updates the fragments' indexes so that it is incremented by sourceName. + * (E.g.: {"a", 0}, {"a", 1}, {"b", 0} ... ) + * + * @param fragments fragments to be updated + */ + private static void updateFragmentIndex(List<Fragment> fragments) { + + String sourceName = null; + int index = 0; + for (Fragment fragment : fragments) { + + String currentSourceName = fragment.getSourceName(); + if (!currentSourceName.equals(sourceName)) { + index = 0; + sourceName = currentSourceName; + } + fragment.setIndex(index++); + } + } + + /** + * Converts hosts to their matching IP addresses. + * + * @throws UnknownHostException if converting host name to IP fails + */ + private static void convertHostsToIPs(List<Fragment> fragments) + throws UnknownHostException { + /* host converted to IP map. Used to limit network calls. */ + HashMap<String, String> hostToIpMap = new HashMap<String, String>(); + + for (Fragment fragment : fragments) { + String[] hosts = fragment.getReplicas(); + if (hosts == null) { + continue; + } + String[] ips = new String[hosts.length]; + int index = 0; + + for (String host : hosts) { + String convertedIp = hostToIpMap.get(host); + if (convertedIp == null) { + /* find host's IP, and add to map */ + InetAddress addr = InetAddress.getByName(host); + convertedIp = addr.getHostAddress(); + hostToIpMap.put(host, convertedIp); + } + + /* update IPs array */ + ips[index] = convertedIp; + ++index; + } + fragment.setReplicas(ips); + } + } + + /* + * Converts a fragments list to a readable string and prints it to the log. + * Intended for debugging purposes only. 'datapath' is the data path part of + * the original URI (e.g., table name, *.csv, etc). + */ + private static void printList(List<Fragment> fragments, String datapath) { + LOG.debug("List of " + (fragments.isEmpty() ? "no" : fragments.size()) + + "fragments for \"" + datapath + "\""); + + int i = 0; + for (Fragment fragment : fragments) { + StringBuilder result = new StringBuilder(); + result.append("Fragment #").append(++i).append(": [").append( + "Source: ").append(fragment.getSourceName()).append( + ", Index: ").append(fragment.getIndex()).append( + ", Replicas:"); + for (String host : fragment.getReplicas()) { + result.append(" ").append(host); + } + + result.append(", Metadata: ").append( + new String(fragment.getMetadata())); + + if (fragment.getUserData() != null) { + result.append(", User Data: ").append( + new String(fragment.getUserData())); + } + result.append("] "); + LOG.debug(result); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java new file mode 100644 index 0000000..e1c2eb4 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java @@ -0,0 +1,135 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.hawq.pxf.api.UnsupportedTypeException; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.service.io.GPDBWritable; +import org.apache.hawq.pxf.service.io.GPDBWritable.TypeMismatchException; + +/* + * Class for mapping GPDBWritable get functions to java types. + */ +public class GPDBWritableMapper { + + private GPDBWritable gpdbWritable; + private int type; + private DataGetter getter = null; + + public GPDBWritableMapper(GPDBWritable gpdbWritable) { + this.gpdbWritable = gpdbWritable; + } + + public void setDataType(int type) throws UnsupportedTypeException { + this.type = type; + + switch (DataType.get(type)) { + case BOOLEAN: + getter = new BooleanDataGetter(); + break; + case BYTEA: + getter = new BytesDataGetter(); + break; + case BIGINT: + getter = new LongDataGetter(); + break; + case SMALLINT: + getter = new ShortDataGetter(); + break; + case INTEGER: + getter = new IntDataGetter(); + break; + case TEXT: + getter = new StringDataGetter(); + break; + case REAL: + getter = new FloatDataGetter(); + break; + case FLOAT8: + getter = new DoubleDataGetter(); + break; + default: + throw new UnsupportedTypeException( + "Type " + GPDBWritable.getTypeName(type) + + " is not supported by GPDBWritable"); + } + } + + public Object getData(int colIdx) throws TypeMismatchException { + return getter.getData(colIdx); + } + + private interface DataGetter { + abstract Object getData(int colIdx) throws TypeMismatchException; + } + + private class BooleanDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getBoolean(colIdx); + } + } + + private class BytesDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getBytes(colIdx); + } + } + + private class DoubleDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getDouble(colIdx); + } + } + + private class FloatDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getFloat(colIdx); + } + } + + private class IntDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getInt(colIdx); + } + } + + private class LongDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getLong(colIdx); + } + } + + private class ShortDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getShort(colIdx); + } + } + + private class StringDataGetter implements DataGetter { + public Object getData(int colIdx) throws TypeMismatchException { + return gpdbWritable.getString(colIdx); + } + } + + public String toString() { + return "getter type = " + GPDBWritable.getTypeName(type); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java new file mode 100644 index 0000000..396b711 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java @@ -0,0 +1,36 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.hawq.pxf.api.MetadataFetcher; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Utilities; + +/** + * Factory class for creation of {@link MetadataFetcher} objects. + * The actual {@link MetadataFetcher} object is "hidden" behind an {@link MetadataFetcher} + * abstract class which is returned by the MetadataFetcherFactory. + */ +public class MetadataFetcherFactory { + public static MetadataFetcher create(InputData inputData) throws Exception { + return (MetadataFetcher) Utilities.createAnyInstance(InputData.class, inputData.getMetadata(), inputData); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java new file mode 100644 index 0000000..741e201 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java @@ -0,0 +1,93 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import java.util.List; + +import javax.ws.rs.core.StreamingOutput; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.Metadata; + +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion; + + +/** + * Class for serializing metadata in JSON format. The class implements + * {@link StreamingOutput} so the serialization will be done in a stream and not + * in one bulk, this in order to avoid running out of memory when processing a + * lot of items. + */ +public class MetadataResponse implements StreamingOutput { + + private static final Log Log = LogFactory.getLog(MetadataResponse.class); + private static final String METADATA_DEFAULT_RESPONSE = "{\"PXFMetadata\":[]}"; + + private List<Metadata> metadataList; + + /** + * Constructs metadata response out of a metadata list + * + * @param metadataList metadata list + */ + public MetadataResponse(List<Metadata> metadataList) { + this.metadataList = metadataList; + } + + /** + * Serializes the metadata list in JSON, To be used as the result string for HAWQ. + */ + @Override + public void write(OutputStream output) throws IOException { + DataOutputStream dos = new DataOutputStream(output); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(org.codehaus.jackson.map.SerializationConfig.Feature.USE_ANNOTATIONS, true); // enable annotations for serialization + mapper.setSerializationInclusion(Inclusion.NON_EMPTY); // ignore empty fields + + if(metadataList == null || metadataList.isEmpty()) { + dos.write(METADATA_DEFAULT_RESPONSE.getBytes()); + return; + } + + dos.write("{\"PXFMetadata\":[".getBytes()); + + String prefix = ""; + for (Metadata metadata : metadataList) { + if(metadata == null) { + throw new IllegalArgumentException("metadata object is null - cannot serialize"); + } + if ((metadata.getFields() == null) || metadata.getFields().isEmpty()) { + throw new IllegalArgumentException("metadata for " + metadata.getItem() + " contains no fields - cannot serialize"); + } + StringBuilder result = new StringBuilder(); + result.append(prefix).append(mapper.writeValueAsString(metadata)); + prefix = ","; + dos.write(result.toString().getBytes()); + } + + dos.write("]}".getBytes()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java new file mode 100644 index 0000000..8225ec5 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java @@ -0,0 +1,95 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion; + +import org.apache.hawq.pxf.api.Metadata; + +/** + * Utility class for converting {@link Metadata} into a JSON format. + */ +public class MetadataResponseFormatter { + + private static final Log LOG = LogFactory.getLog(MetadataResponseFormatter.class); + + /** + * Converts list of {@link Metadata} to JSON String format. + * + * @param metadataList list of metadata objects to convert + * @param path path string + * @return JSON formatted response + * @throws IOException if converting the data to JSON fails + */ + public static MetadataResponse formatResponse(List<Metadata> metadataList, String path) throws IOException { + /* print the fragment list to log when in debug level */ + if (LOG.isDebugEnabled()) { + MetadataResponseFormatter.printMetadata(metadataList, path); + } + + return new MetadataResponse(metadataList); + } + + /** + * Converts metadata list to a readable string. + * Intended for debugging purposes only. + */ + private static void printMetadata(List<Metadata> metadataList, String path) { + LOG.debug("Metadata List for path " + path + ": "); + + if (null == metadataList || metadataList.isEmpty()) { + LOG.debug("No metadata"); + return; + } + + for(Metadata metadata: metadataList) { + StringBuilder result = new StringBuilder(); + + if (metadata == null) { + result.append("None"); + LOG.debug(result); + continue; + } + + result.append("Metadata for item \"").append(metadata.getItem()).append("\": "); + + if ((metadata.getFields() == null) || metadata.getFields().isEmpty()) { + result.append("None"); + } else { + int i = 0; + for (Metadata.Field field : metadata.getFields()) { + result.append("Field #").append(++i).append(": [") + .append("Name: ").append(field.getName()) + .append(", Type: ").append(field.getType().getTypeName()) + .append(", Source type: ").append(field.getSourceType()).append("] "); + } + } + LOG.debug(result); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java new file mode 100644 index 0000000..01a95ab --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java @@ -0,0 +1,179 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.BadRecordException; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.ReadResolver; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; +import org.apache.hawq.pxf.api.utilities.Utilities; +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.*; +import java.nio.charset.CharacterCodingException; +import java.util.LinkedList; +import java.util.zip.ZipException; + +/** + * ReadBridge class creates appropriate accessor and resolver. It will then + * create the correct output conversion class (e.g. Text or GPDBWritable) and + * get records from accessor, let resolver deserialize them and reserialize them + * using the output conversion class. <br> + * The class handles BadRecordException and other exception type and marks the + * record as invalid for HAWQ. + */ +public class ReadBridge implements Bridge { + ReadAccessor fileAccessor = null; + ReadResolver fieldsResolver = null; + BridgeOutputBuilder outputBuilder = null; + LinkedList<Writable> outputQueue = null; + + private static final Log LOG = LogFactory.getLog(ReadBridge.class); + + /** + * C'tor - set the implementation of the bridge. + * + * @param protData input containing accessor and resolver names + * @throws Exception if accessor or resolver can't be instantiated + */ + public ReadBridge(ProtocolData protData) throws Exception { + outputBuilder = new BridgeOutputBuilder(protData); + outputQueue = new LinkedList<Writable>(); + fileAccessor = getFileAccessor(protData); + fieldsResolver = getFieldsResolver(protData); + } + + /** + * Accesses the underlying HDFS file. + */ + @Override + public boolean beginIteration() throws Exception { + return fileAccessor.openForRead(); + } + + /** + * Fetches next object from file and turn it into a record that the HAWQ + * backend can process. + */ + @Override + public Writable getNext() throws Exception { + Writable output = null; + OneRow onerow = null; + + if (!outputQueue.isEmpty()) { + return outputQueue.pop(); + } + + try { + while (outputQueue.isEmpty()) { + onerow = fileAccessor.readNextObject(); + if (onerow == null) { + fileAccessor.closeForRead(); + output = outputBuilder.getPartialLine(); + if (output != null) { + LOG.warn("A partial record in the end of the fragment"); + } + // if there is a partial line, return it now, otherwise it + // will return null + return output; + } + + // we checked before that outputQueue is empty, so we can + // override it. + outputQueue = outputBuilder.makeOutput(fieldsResolver.getFields(onerow)); + if (!outputQueue.isEmpty()) { + output = outputQueue.pop(); + break; + } + } + } catch (IOException ex) { + if (!isDataException(ex)) { + fileAccessor.closeForRead(); + throw ex; + } + output = outputBuilder.getErrorOutput(ex); + } catch (BadRecordException ex) { + String row_info = "null"; + if (onerow != null) { + row_info = onerow.toString(); + } + if (ex.getCause() != null) { + LOG.debug("BadRecordException " + ex.getCause().toString() + + ": " + row_info); + } else { + LOG.debug(ex.toString() + ": " + row_info); + } + output = outputBuilder.getErrorOutput(ex); + } catch (Exception ex) { + fileAccessor.closeForRead(); + throw ex; + } + + return output; + } + + public static ReadAccessor getFileAccessor(InputData inputData) + throws Exception { + return (ReadAccessor) Utilities.createAnyInstance(InputData.class, + inputData.getAccessor(), inputData); + } + + public static ReadResolver getFieldsResolver(InputData inputData) + throws Exception { + return (ReadResolver) Utilities.createAnyInstance(InputData.class, + inputData.getResolver(), inputData); + } + + /* + * There are many exceptions that inherit IOException. Some of them like + * EOFException are generated due to a data problem, and not because of an + * IO/connection problem as the father IOException might lead us to believe. + * For example, an EOFException will be thrown while fetching a record from + * a sequence file, if there is a formatting problem in the record. Fetching + * record from the sequence-file is the responsibility of the accessor so + * the exception will be thrown from the accessor. We identify this cases by + * analyzing the exception type, and when we discover that the actual + * problem was a data problem, we return the errorOutput GPDBWritable. + */ + private boolean isDataException(IOException ex) { + return (ex instanceof EOFException + || ex instanceof CharacterCodingException + || ex instanceof CharConversionException + || ex instanceof UTFDataFormatException || ex instanceof ZipException); + } + + @Override + public boolean setNext(DataInputStream inputStream) { + throw new UnsupportedOperationException("setNext is not implemented"); + } + + @Override + public boolean isThreadSafe() { + boolean result = ((Plugin) fileAccessor).isThreadSafe() + && ((Plugin) fieldsResolver).isThreadSafe(); + LOG.debug("Bridge is " + (result ? "" : "not ") + "thread safe"); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java new file mode 100644 index 0000000..d5ae66a --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java @@ -0,0 +1,131 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.DataInputStream; +import java.util.BitSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.AnalyzeUtils; +import org.apache.hawq.pxf.service.utilities.ProtocolData; + +/** + * ReadSamplingBridge wraps a ReadBridge, and returns only some of the output + * records, based on a ratio sample. The sample to pass or discard a record is + * done after all of the processing is completed ( + * {@code accessor -> resolver -> output builder}) to make sure there are no + * chunks of data instead of single records. <br> + * The goal is to get as uniform as possible sampling. This is achieved by + * creating a bit map matching the precision of the sampleRatio, so that for a + * ratio of 0.034, a bit-map of 1000 bits will be created, and 34 bits will be + * set. This map is matched against each read record, discarding ones with a 0 + * bit and continuing until a 1 bit record is read. + */ +public class ReadSamplingBridge implements Bridge { + + ReadBridge bridge; + + float sampleRatio; + BitSet sampleBitSet; + int bitSetSize; + int sampleSize; + int curIndex; + + private static final Log LOG = LogFactory.getLog(ReadSamplingBridge.class); + + /** + * C'tor - set the implementation of the bridge. + * + * @param protData input containing sampling ratio + * @throws Exception if the sampling ratio is wrong + */ + public ReadSamplingBridge(ProtocolData protData) throws Exception { + bridge = new ReadBridge(protData); + + this.sampleRatio = protData.getStatsSampleRatio(); + if (sampleRatio < 0.0001 || sampleRatio > 1.0) { + throw new IllegalArgumentException( + "sampling ratio must be a value between 0.0001 and 1.0. " + + "(value = " + sampleRatio + ")"); + } + + calculateBitSetSize(); + + this.sampleBitSet = AnalyzeUtils.generateSamplingBitSet(bitSetSize, + sampleSize); + this.curIndex = 0; + } + + private void calculateBitSetSize() { + + sampleSize = (int) (sampleRatio * 10000); + bitSetSize = 10000; + + while ((bitSetSize > 100) && (sampleSize % 10 == 0)) { + bitSetSize /= 10; + sampleSize /= 10; + } + LOG.debug("bit set size = " + bitSetSize + " sample size = " + + sampleSize); + } + + /** + * Fetches next sample, according to the sampling ratio. + */ + @Override + public Writable getNext() throws Exception { + Writable output = bridge.getNext(); + + // sample - if bit is false, advance to the next object + while (!sampleBitSet.get(curIndex)) { + + if (output == null) { + break; + } + incIndex(); + output = bridge.getNext(); + } + + incIndex(); + return output; + } + + private void incIndex() { + curIndex = (++curIndex) % bitSetSize; + } + + @Override + public boolean beginIteration() throws Exception { + return bridge.beginIteration(); + } + + @Override + public boolean setNext(DataInputStream inputStream) throws Exception { + return bridge.setNext(inputStream); + } + + @Override + public boolean isThreadSafe() { + return bridge.isThreadSafe(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java new file mode 100644 index 0000000..c3ee731 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java @@ -0,0 +1,117 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.hawq.pxf.api.*; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; +import org.apache.hawq.pxf.api.utilities.Utilities; +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.DataInputStream; +import java.util.List; + +/* + * WriteBridge class creates appropriate accessor and resolver. + * It reads data from inputStream by the resolver, + * and writes it to the Hadoop storage with the accessor. + */ +public class WriteBridge implements Bridge { + private static final Log LOG = LogFactory.getLog(WriteBridge.class); + WriteAccessor fileAccessor = null; + WriteResolver fieldsResolver = null; + BridgeInputBuilder inputBuilder; + + /* + * C'tor - set the implementation of the bridge + */ + public WriteBridge(ProtocolData protocolData) throws Exception { + + inputBuilder = new BridgeInputBuilder(protocolData); + /* plugins accept InputData parameters */ + fileAccessor = getFileAccessor(protocolData); + fieldsResolver = getFieldsResolver(protocolData); + + } + + /* + * Accesses the underlying HDFS file + */ + @Override + public boolean beginIteration() throws Exception { + return fileAccessor.openForWrite(); + } + + /* + * Read data from stream, convert it using WriteResolver into OneRow object, and + * pass to WriteAccessor to write into file. + */ + @Override + public boolean setNext(DataInputStream inputStream) throws Exception { + + List<OneField> record = inputBuilder.makeInput(inputStream); + if (record == null) { + close(); + return false; + } + + OneRow onerow = fieldsResolver.setFields(record); + if (onerow == null) { + close(); + return false; + } + if (!fileAccessor.writeNextObject(onerow)) { + close(); + throw new BadRecordException(); + } + return true; + } + + private void close() throws Exception { + try { + fileAccessor.closeForWrite(); + } catch (Exception e) { + LOG.error("Failed to close bridge resources: " + e.getMessage()); + throw e; + } + } + + private static WriteAccessor getFileAccessor(InputData inputData) throws Exception { + return (WriteAccessor) Utilities.createAnyInstance(InputData.class, inputData.getAccessor(), inputData); + } + + private static WriteResolver getFieldsResolver(InputData inputData) throws Exception { + return (WriteResolver) Utilities.createAnyInstance(InputData.class, inputData.getResolver(), inputData); + } + + @Override + public Writable getNext() { + throw new UnsupportedOperationException("getNext is not implemented"); + } + + @Override + public boolean isThreadSafe() { + return ((Plugin) fileAccessor).isThreadSafe() && ((Plugin) fieldsResolver).isThreadSafe(); + } +}
