HAWQ-992. PXF Hive data type check in Fragmenter too restrictive.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/30aecce5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/30aecce5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/30aecce5 Branch: refs/heads/HAWQ-992 Commit: 30aecce5aee1d45f9326618d255e64dbd2cdfdc5 Parents: e2416f4 Author: Oleksandr Diachenko <[email protected]> Authored: Fri Aug 26 16:05:32 2016 -0700 Committer: Oleksandr Diachenko <[email protected]> Committed: Fri Aug 26 16:05:32 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hawq/pxf/service/Bridge.java | 40 - .../hawq/pxf/service/BridgeInputBuilder.java | 71 -- .../hawq/pxf/service/BridgeOutputBuilder.java | 394 -------- .../hawq/pxf/service/FragmenterFactory.java | 37 - .../hawq/pxf/service/FragmentsResponse.java | 89 -- .../pxf/service/FragmentsResponseFormatter.java | 157 ---- .../hawq/pxf/service/GPDBWritableMapper.java | 135 --- .../pxf/service/MetadataFetcherFactory.java | 36 - .../hawq/pxf/service/MetadataResponse.java | 93 -- .../pxf/service/MetadataResponseFormatter.java | 95 -- .../org/apache/hawq/pxf/service/ReadBridge.java | 179 ---- .../hawq/pxf/service/ReadSamplingBridge.java | 131 --- .../apache/hawq/pxf/service/WriteBridge.java | 117 --- .../hawq/pxf/service/io/BufferWritable.java | 98 -- .../hawq/pxf/service/io/GPDBWritable.java | 893 ------------------- .../org/apache/hawq/pxf/service/io/Text.java | 399 --------- .../apache/hawq/pxf/service/io/Writable.java | 50 -- .../apache/hawq/pxf/service/package-info.java | 23 - .../hawq/pxf/service/rest/BridgeResource.java | 189 ---- .../pxf/service/rest/ClusterNodesResource.java | 148 --- .../pxf/service/rest/FragmenterResource.java | 154 ---- .../pxf/service/rest/InvalidPathResource.java | 179 ---- .../hawq/pxf/service/rest/MetadataResource.java | 124 --- .../hawq/pxf/service/rest/RestResource.java | 71 -- .../service/rest/ServletLifecycleListener.java | 63 -- .../hawq/pxf/service/rest/VersionResource.java | 88 -- .../hawq/pxf/service/rest/WritableResource.java | 174 ---- .../pxf/service/utilities/AnalyzeUtils.java | 147 --- .../service/utilities/CustomWebappLoader.java | 231 ----- .../pxf/service/utilities/Log4jConfigure.java | 66 -- .../pxf/service/utilities/ProtocolData.java | 491 ---------- .../hawq/pxf/service/utilities/SecureLogin.java | 61 -- .../hawq/pxf/service/utilities/SecuredHDFS.java | 114 --- 33 files changed, 5337 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/30aecce5/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java deleted file mode 100644 index bfd862a..0000000 --- a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java +++ /dev/null @@ -1,40 +0,0 @@ -package org.apache.hawq.pxf.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import org.apache.hawq.pxf.service.io.Writable; - -import java.io.DataInputStream; - -/** - * Bridge interface - defines the interface of the Bridge classes. Any Bridge - * class acts as an iterator over Hadoop stored data, and should implement - * getNext (for reading) or setNext (for writing) for handling accessed data. - */ -public interface Bridge { - boolean beginIteration() throws Exception; - - Writable getNext() throws Exception; - - boolean setNext(DataInputStream inputStream) throws Exception; - - boolean isThreadSafe(); -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/30aecce5/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java deleted file mode 100644 index 4b4d2e8..0000000 --- a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java +++ /dev/null @@ -1,71 +0,0 @@ -package org.apache.hawq.pxf.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import org.apache.hawq.pxf.api.OneField; -import org.apache.hawq.pxf.api.OutputFormat; -import org.apache.hawq.pxf.api.io.DataType; -import org.apache.hawq.pxf.service.io.GPDBWritable; -import org.apache.hawq.pxf.service.io.Text; -import org.apache.hawq.pxf.service.utilities.ProtocolData; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.io.DataInput; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; - -public class BridgeInputBuilder { - private ProtocolData protocolData; - private static final Log LOG = LogFactory.getLog(BridgeInputBuilder.class); - - public BridgeInputBuilder(ProtocolData protocolData) throws Exception { - this.protocolData = protocolData; - } - - public List<OneField> makeInput(DataInput inputStream) throws Exception { - if (protocolData.outputFormat() == OutputFormat.TEXT) { - Text txt = new Text(); - txt.readFields(inputStream); - return Collections.singletonList(new OneField(DataType.BYTEA.getOID(), txt.getBytes())); - } - - GPDBWritable gpdbWritable = new GPDBWritable(); - gpdbWritable.readFields(inputStream); - - if (gpdbWritable.isEmpty()) { - LOG.debug("Reached end of stream"); - return null; - } - - GPDBWritableMapper mapper = new GPDBWritableMapper(gpdbWritable); - int[] colTypes = gpdbWritable.getColType(); - List<OneField> record = new LinkedList<OneField>(); - for (int i = 0; i < colTypes.length; i++) { - mapper.setDataType(colTypes[i]); - record.add(new OneField(colTypes[i], mapper.getData(i))); - } - - return record; - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/30aecce5/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java deleted file mode 100644 index c59fbea..0000000 --- a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java +++ /dev/null @@ -1,394 +0,0 @@ -package org.apache.hawq.pxf.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.hawq.pxf.api.BadRecordException; -import org.apache.hawq.pxf.api.OneField; -import org.apache.hawq.pxf.api.OutputFormat; -import org.apache.hawq.pxf.api.io.DataType; -import org.apache.hawq.pxf.service.io.BufferWritable; -import org.apache.hawq.pxf.service.io.GPDBWritable; -import org.apache.hawq.pxf.service.io.GPDBWritable.TypeMismatchException; -import org.apache.hawq.pxf.service.io.Text; -import org.apache.hawq.pxf.service.io.Writable; -import org.apache.hawq.pxf.service.utilities.ProtocolData; - -import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.lang.ObjectUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.lang.reflect.Array; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; - -import static org.apache.hawq.pxf.api.io.DataType.TEXT; - -/** - * Class creates the output record that is piped by the java process to the HAWQ - * backend. Actually, the output record is serialized and the obtained byte - * string is piped to the HAWQ segment. The output record will implement - * Writable, and the mission of BridgeOutputBuilder will be to translate a list - * of {@link OneField} objects (obtained from the Resolver) into an output - * record. - */ -public class BridgeOutputBuilder { - private ProtocolData inputData; - private Writable output = null; - private LinkedList<Writable> outputList = null; - private Writable partialLine = null; - private GPDBWritable errorRecord = null; - private int[] schema; - private String[] colNames; - private boolean samplingEnabled = false; - private boolean isPartialLine = false; - - private static final byte DELIM = 10; /* (byte)'\n'; */ - - private static final Log LOG = LogFactory.getLog(BridgeOutputBuilder.class); - - /** - * Constructs a BridgeOutputBuilder. - * - * @param input input data, like requested output format and schema - * information - */ - public BridgeOutputBuilder(ProtocolData input) { - inputData = input; - outputList = new LinkedList<Writable>(); - makeErrorRecord(); - samplingEnabled = (inputData.getStatsSampleRatio() > 0); - } - - /** - * We need a separate GPDBWritable record to represent the error record. - * Just setting the errorFlag on the "output" GPDBWritable variable is not - * good enough, since the GPDBWritable is built only after the first record - * is read from the file. And if we encounter an error while fetching the - * first record from the file, then the output member will be null. The - * reason we cannot count on the schema to build the GPDBWritable output - * variable before reading the first record, is because the schema does not - * account for arrays - we cannot know from the schema the length of an - * array. We find out only after fetching the first record. - */ - void makeErrorRecord() { - int[] errSchema = { TEXT.getOID() }; - - if (inputData.outputFormat() != OutputFormat.BINARY) { - return; - } - - errorRecord = new GPDBWritable(errSchema); - errorRecord.setError(true); - } - - /** - * Returns the error record. If the output format is not binary, error - * records are not supported, and the given exception will be thrown - * - * @param ex exception to be stored in record - * @return error record - * @throws Exception if the output format is not binary - */ - public Writable getErrorOutput(Exception ex) throws Exception { - if (inputData.outputFormat() == OutputFormat.BINARY) { - errorRecord.setString(0, ex.getMessage()); - return errorRecord; - } else { - throw ex; - } - } - - /** - * Translates recFields (obtained from the Resolver) into an output record. - * - * @param recFields record fields to be serialized - * @return list of Writable objects with serialized row - * @throws BadRecordException if building the output record failed - */ - public LinkedList<Writable> makeOutput(List<OneField> recFields) - throws BadRecordException { - if (output == null && inputData.outputFormat() == OutputFormat.BINARY) { - makeGPDBWritableOutput(); - } - - outputList.clear(); - - fillOutputRecord(recFields); - - return outputList; - } - - /** - * Returns whether or not this is a partial line. - * - * @return true for a partial line - */ - public Writable getPartialLine() { - return partialLine; - } - - /** - * Creates the GPDBWritable object. The object is created one time and is - * refilled from recFields for each record sent - * - * @return empty GPDBWritable object with set columns - */ - GPDBWritable makeGPDBWritableOutput() { - int num_actual_fields = inputData.getColumns(); - schema = new int[num_actual_fields]; - colNames = new String[num_actual_fields]; - - for (int i = 0; i < num_actual_fields; i++) { - schema[i] = inputData.getColumn(i).columnTypeCode(); - colNames[i] = inputData.getColumn(i).columnName(); - } - - output = new GPDBWritable(schema); - - return (GPDBWritable) output; - } - - /** - * Fills the output record based on the fields in recFields. - * - * @param recFields record fields - * @throws BadRecordException if building the output record failed - */ - void fillOutputRecord(List<OneField> recFields) throws BadRecordException { - if (inputData.outputFormat() == OutputFormat.BINARY) { - fillGPDBWritable(recFields); - } else { - fillText(recFields); - } - } - - /** - * Fills a GPDBWritable object based on recFields. The input record - * recFields must correspond to schema. If the record has more or less - * fields than the schema we throw an exception. We require that the type of - * field[i] in recFields corresponds to the type of field[i] in the schema. - * - * @param recFields record fields - * @throws BadRecordException if building the output record failed - */ - void fillGPDBWritable(List<OneField> recFields) throws BadRecordException { - int size = recFields.size(); - if (size == 0) { // size 0 means the resolver couldn't deserialize any - // of the record fields - throw new BadRecordException("No fields in record"); - } else if (size != schema.length) { - throw new BadRecordException("Record has " + size - + " fields but the schema size is " + schema.length); - } - - for (int i = 0; i < size; i++) { - OneField current = recFields.get(i); - if (!isTypeInSchema(current.type, schema[i])) { - throw new BadRecordException("For field " + colNames[i] - + " schema requires type " - + DataType.get(schema[i]).toString() - + " but input record has type " - + DataType.get(current.type).toString()); - } - - fillOneGPDBWritableField(current, i); - } - - outputList.add(output); - } - - /** - * Tests if data type is a string type. String type is a type that can be - * serialized as string, such as varchar, bpchar, text, numeric, timestamp, - * date. - * - * @param type data type - * @return whether data type is string type - */ - boolean isStringType(DataType type) { - return Arrays.asList(DataType.VARCHAR, DataType.BPCHAR, DataType.TEXT, - DataType.NUMERIC, DataType.TIMESTAMP, DataType.DATE).contains( - type); - } - - /** - * Tests if record field type and schema type correspond. - * - * @param recType record type code - * @param schemaType schema type code - * @return whether record type and schema type match - */ - boolean isTypeInSchema(int recType, int schemaType) { - DataType dtRec = DataType.get(recType); - DataType dtSchema = DataType.get(schemaType); - - return (dtSchema == DataType.UNSUPPORTED_TYPE || dtRec == dtSchema || (isStringType(dtRec) && isStringType(dtSchema))); - } - - /** - * Fills a Text object based on recFields. - * - * @param recFields record fields - * @throws BadRecordException if text formatted record has more than one - * field - */ - void fillText(List<OneField> recFields) throws BadRecordException { - /* - * For the TEXT case there must be only one record in the list - */ - if (recFields.size() != 1) { - throw new BadRecordException( - "BridgeOutputBuilder must receive one field when handling the TEXT format"); - } - - OneField fld = recFields.get(0); - int type = fld.type; - Object val = fld.val; - if (DataType.get(type) == DataType.BYTEA) {// from LineBreakAccessor - if (samplingEnabled) { - convertTextDataToLines((byte[]) val); - } else { - output = new BufferWritable((byte[]) val); - outputList.add(output); // TODO break output into lines - } - } else { // from QuotedLineBreakAccessor - String textRec = (String) val; - output = new Text(textRec + "\n"); - outputList.add(output); - } - } - - /** - * Breaks raw bytes into lines. Used only for sampling. - * - * When sampling a data source, we have to make sure that we deal with - * actual rows (lines) and not bigger chunks of data such as used by - * LineBreakAccessor for performance. The input byte array is broken into - * lines, each one stored in the outputList. In case the read data doesn't - * end with a line delimiter, which can happen when reading chunks of bytes, - * the partial line is stored separately, and is being completed when - * reading the next chunk of data. - * - * @param val input raw data to break into lines - */ - void convertTextDataToLines(byte[] val) { - int len = val.length; - int start = 0; - int end = 0; - byte[] line; - BufferWritable writable; - - while (start < len) { - end = ArrayUtils.indexOf(val, DELIM, start); - if (end == ArrayUtils.INDEX_NOT_FOUND) { - // data finished in the middle of the line - end = len; - isPartialLine = true; - } else { - end++; // include the DELIM character - isPartialLine = false; - } - line = Arrays.copyOfRange(val, start, end); - - if (partialLine != null) { - // partial data was completed - ((BufferWritable) partialLine).append(line); - writable = (BufferWritable) partialLine; - partialLine = null; - } else { - writable = new BufferWritable(line); - } - - if (isPartialLine) { - partialLine = writable; - } else { - outputList.add(writable); - } - start = end; - } - } - - /** - * Fills one GPDBWritable field. - * - * @param oneField field - * @param colIdx column index - * @throws BadRecordException if field type is not supported or doesn't - * match the schema - */ - void fillOneGPDBWritableField(OneField oneField, int colIdx) - throws BadRecordException { - int type = oneField.type; - Object val = oneField.val; - GPDBWritable gpdbOutput = (GPDBWritable) output; - try { - switch (DataType.get(type)) { - case INTEGER: - gpdbOutput.setInt(colIdx, (Integer) val); - break; - case FLOAT8: - gpdbOutput.setDouble(colIdx, (Double) val); - break; - case REAL: - gpdbOutput.setFloat(colIdx, (Float) val); - break; - case BIGINT: - gpdbOutput.setLong(colIdx, (Long) val); - break; - case SMALLINT: - gpdbOutput.setShort(colIdx, (Short) val); - break; - case BOOLEAN: - gpdbOutput.setBoolean(colIdx, (Boolean) val); - break; - case BYTEA: - byte[] bts = null; - if (val != null) { - int length = Array.getLength(val); - bts = new byte[length]; - for (int j = 0; j < length; j++) { - bts[j] = Array.getByte(val, j); - } - } - gpdbOutput.setBytes(colIdx, bts); - break; - case VARCHAR: - case BPCHAR: - case CHAR: - case TEXT: - case NUMERIC: - case TIMESTAMP: - case DATE: - gpdbOutput.setString(colIdx, - ObjectUtils.toString(val, null)); - break; - default: - String valClassName = (val != null) ? val.getClass().getSimpleName() - : null; - throw new UnsupportedOperationException(valClassName - + " is not supported for HAWQ conversion"); - } - } catch (TypeMismatchException e) { - throw new BadRecordException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/30aecce5/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java deleted file mode 100644 index c516d69..0000000 --- a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -package org.apache.hawq.pxf.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import org.apache.hawq.pxf.api.Fragmenter; -import org.apache.hawq.pxf.api.utilities.InputData; -import org.apache.hawq.pxf.api.utilities.Utilities; - -/** - * Factory class for creation of {@link Fragmenter} objects. The actual {@link Fragmenter} object is "hidden" behind - * an {@link Fragmenter} abstract class which is returned by the FragmenterFactory. - */ -public class FragmenterFactory { - static public Fragmenter create(InputData inputData) throws Exception { - String fragmenterName = inputData.getFragmenter(); - - return (Fragmenter) Utilities.createAnyInstance(InputData.class, fragmenterName, inputData); - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/30aecce5/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java deleted file mode 100644 index d6efcae..0000000 --- a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java +++ /dev/null @@ -1,89 +0,0 @@ -package org.apache.hawq.pxf.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.List; - -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.StreamingOutput; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.codehaus.jackson.map.ObjectMapper; - -import org.apache.hawq.pxf.api.Fragment; - -/** - * Class for serializing fragments metadata in JSON format. The class implements - * {@link StreamingOutput} so the serialization will be done in a stream and not - * in one bulk, this in order to avoid running out of memory when processing a - * lot of fragments. - */ -public class FragmentsResponse implements StreamingOutput { - - private static final Log Log = LogFactory.getLog(FragmentsResponse.class); - - private List<Fragment> fragments; - - /** - * Constructs fragments response out of a list of fragments - * - * @param fragments fragment list - */ - public FragmentsResponse(List<Fragment> fragments) { - this.fragments = fragments; - } - - /** - * Serializes a fragments list in JSON, To be used as the result string for - * HAWQ. An example result is as follows: - * <code>{"PXFFragments":[{"replicas": - * ["sdw1.corp.emc.com","sdw3.corp.emc.com","sdw8.corp.emc.com"], - * "sourceName":"text2.csv", "index":"0","metadata":"<base64 metadata for fragment>", - * "userData":"<data_specific_to_third_party_fragmenter>" - * },{"replicas":["sdw2.corp.emc.com","sdw4.corp.emc.com","sdw5.corp.emc.com" - * ],"sourceName":"text_data.csv","index":"0","metadata": - * "<base64 metadata for fragment>" - * ,"userData":"<data_specific_to_third_party_fragmenter>" - * }]}</code> - */ - @Override - public void write(OutputStream output) throws IOException, - WebApplicationException { - DataOutputStream dos = new DataOutputStream(output); - ObjectMapper mapper = new ObjectMapper(); - - dos.write("{\"PXFFragments\":[".getBytes()); - - String prefix = ""; - for (Fragment fragment : fragments) { - StringBuilder result = new StringBuilder(); - /* metaData and userData are automatically converted to Base64 */ - result.append(prefix).append(mapper.writeValueAsString(fragment)); - prefix = ","; - dos.write(result.toString().getBytes()); - } - - dos.write("]}".getBytes()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/30aecce5/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java deleted file mode 100644 index 14e87f9..0000000 --- a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java +++ /dev/null @@ -1,157 +0,0 @@ -package org.apache.hawq.pxf.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.hawq.pxf.api.Fragment; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.List; - -/** - * Utility class for converting Fragments into a {@link FragmentsResponse} that - * will serialize them into JSON format. - */ -public class FragmentsResponseFormatter { - - private static final Log LOG = LogFactory.getLog(FragmentsResponseFormatter.class); - - /** - * Converts Fragments list to FragmentsResponse after replacing host name by - * their respective IPs. - * - * @param fragments list of fragments - * @param data data (e.g. path) related to the fragments - * @return FragmentsResponse with given fragments - * @throws UnknownHostException if converting host names to IP fails - */ - public static FragmentsResponse formatResponse(List<Fragment> fragments, - String data) - throws UnknownHostException { - /* print the raw fragment list to log when in debug level */ - if (LOG.isDebugEnabled()) { - LOG.debug("Fragments before conversion to IP list:"); - FragmentsResponseFormatter.printList(fragments, data); - } - - /* HD-2550: convert host names to IPs */ - convertHostsToIPs(fragments); - - updateFragmentIndex(fragments); - - /* print the fragment list to log when in debug level */ - if (LOG.isDebugEnabled()) { - FragmentsResponseFormatter.printList(fragments, data); - } - - return new FragmentsResponse(fragments); - } - - /** - * Updates the fragments' indexes so that it is incremented by sourceName. - * (E.g.: {"a", 0}, {"a", 1}, {"b", 0} ... ) - * - * @param fragments fragments to be updated - */ - private static void updateFragmentIndex(List<Fragment> fragments) { - - String sourceName = null; - int index = 0; - for (Fragment fragment : fragments) { - - String currentSourceName = fragment.getSourceName(); - if (!currentSourceName.equals(sourceName)) { - index = 0; - sourceName = currentSourceName; - } - fragment.setIndex(index++); - } - } - - /** - * Converts hosts to their matching IP addresses. - * - * @throws UnknownHostException if converting host name to IP fails - */ - private static void convertHostsToIPs(List<Fragment> fragments) - throws UnknownHostException { - /* host converted to IP map. Used to limit network calls. */ - HashMap<String, String> hostToIpMap = new HashMap<String, String>(); - - for (Fragment fragment : fragments) { - String[] hosts = fragment.getReplicas(); - if (hosts == null) { - continue; - } - String[] ips = new String[hosts.length]; - int index = 0; - - for (String host : hosts) { - String convertedIp = hostToIpMap.get(host); - if (convertedIp == null) { - /* find host's IP, and add to map */ - InetAddress addr = InetAddress.getByName(host); - convertedIp = addr.getHostAddress(); - hostToIpMap.put(host, convertedIp); - } - - /* update IPs array */ - ips[index] = convertedIp; - ++index; - } - fragment.setReplicas(ips); - } - } - - /* - * Converts a fragments list to a readable string and prints it to the log. - * Intended for debugging purposes only. 'datapath' is the data path part of - * the original URI (e.g., table name, *.csv, etc). - */ - private static void printList(List<Fragment> fragments, String datapath) { - LOG.debug("List of " + (fragments.isEmpty() ? "no" : fragments.size()) - + "fragments for \"" + datapath + "\""); - - int i = 0; - for (Fragment fragment : fragments) { - StringBuilder result = new StringBuilder(); - result.append("Fragment #").append(++i).append(": [").append( - "Source: ").append(fragment.getSourceName()).append( - ", Index: ").append(fragment.getIndex()).append( - ", Replicas:"); - for (String host : fragment.getReplicas()) { - result.append(" ").append(host); - } - - result.append(", Metadata: ").append( - new String(fragment.getMetadata())); - - if (fragment.getUserData() != null) { - result.append(", User Data: ").append( - new String(fragment.getUserData())); - } - result.append("] "); - LOG.debug(result); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/30aecce5/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java deleted file mode 100644 index e1c2eb4..0000000 --- a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java +++ /dev/null @@ -1,135 +0,0 @@ -package org.apache.hawq.pxf.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import org.apache.hawq.pxf.api.UnsupportedTypeException; -import org.apache.hawq.pxf.api.io.DataType; -import org.apache.hawq.pxf.service.io.GPDBWritable; -import org.apache.hawq.pxf.service.io.GPDBWritable.TypeMismatchException; - -/* - * Class for mapping GPDBWritable get functions to java types. - */ -public class GPDBWritableMapper { - - private GPDBWritable gpdbWritable; - private int type; - private DataGetter getter = null; - - public GPDBWritableMapper(GPDBWritable gpdbWritable) { - this.gpdbWritable = gpdbWritable; - } - - public void setDataType(int type) throws UnsupportedTypeException { - this.type = type; - - switch (DataType.get(type)) { - case BOOLEAN: - getter = new BooleanDataGetter(); - break; - case BYTEA: - getter = new BytesDataGetter(); - break; - case BIGINT: - getter = new LongDataGetter(); - break; - case SMALLINT: - getter = new ShortDataGetter(); - break; - case INTEGER: - getter = new IntDataGetter(); - break; - case TEXT: - getter = new StringDataGetter(); - break; - case REAL: - getter = new FloatDataGetter(); - break; - case FLOAT8: - getter = new DoubleDataGetter(); - break; - default: - throw new UnsupportedTypeException( - "Type " + GPDBWritable.getTypeName(type) + - " is not supported by GPDBWritable"); - } - } - - public Object getData(int colIdx) throws TypeMismatchException { - return getter.getData(colIdx); - } - - private interface DataGetter { - abstract Object getData(int colIdx) throws TypeMismatchException; - } - - private class BooleanDataGetter implements DataGetter { - public Object getData(int colIdx) throws TypeMismatchException { - return gpdbWritable.getBoolean(colIdx); - } - } - - private class BytesDataGetter implements DataGetter { - public Object getData(int colIdx) throws TypeMismatchException { - return gpdbWritable.getBytes(colIdx); - } - } - - private class DoubleDataGetter implements DataGetter { - public Object getData(int colIdx) throws TypeMismatchException { - return gpdbWritable.getDouble(colIdx); - } - } - - private class FloatDataGetter implements DataGetter { - public Object getData(int colIdx) throws TypeMismatchException { - return gpdbWritable.getFloat(colIdx); - } - } - - private class IntDataGetter implements DataGetter { - public Object getData(int colIdx) throws TypeMismatchException { - return gpdbWritable.getInt(colIdx); - } - } - - private class LongDataGetter implements DataGetter { - public Object getData(int colIdx) throws TypeMismatchException { - return gpdbWritable.getLong(colIdx); - } - } - - private class ShortDataGetter implements DataGetter { - public Object getData(int colIdx) throws TypeMismatchException { - return gpdbWritable.getShort(colIdx); - } - } - - private class StringDataGetter implements DataGetter { - public Object getData(int colIdx) throws TypeMismatchException { - return gpdbWritable.getString(colIdx); - } - } - - public String toString() { - return "getter type = " + GPDBWritable.getTypeName(type); - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/30aecce5/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java deleted file mode 100644 index 396b711..0000000 --- a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.apache.hawq.pxf.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import org.apache.hawq.pxf.api.MetadataFetcher; -import org.apache.hawq.pxf.api.utilities.InputData; -import org.apache.hawq.pxf.api.utilities.Utilities; - -/** - * Factory class for creation of {@link MetadataFetcher} objects. - * The actual {@link MetadataFetcher} object is "hidden" behind an {@link MetadataFetcher} - * abstract class which is returned by the MetadataFetcherFactory. - */ -public class MetadataFetcherFactory { - public static MetadataFetcher create(InputData inputData) throws Exception { - return (MetadataFetcher) Utilities.createAnyInstance(InputData.class, inputData.getMetadata(), inputData); - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/30aecce5/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java deleted file mode 100644 index 741e201..0000000 --- a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java +++ /dev/null @@ -1,93 +0,0 @@ -package org.apache.hawq.pxf.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -import java.util.List; - -import javax.ws.rs.core.StreamingOutput; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hawq.pxf.api.Metadata; - -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion; - - -/** - * Class for serializing metadata in JSON format. The class implements - * {@link StreamingOutput} so the serialization will be done in a stream and not - * in one bulk, this in order to avoid running out of memory when processing a - * lot of items. - */ -public class MetadataResponse implements StreamingOutput { - - private static final Log Log = LogFactory.getLog(MetadataResponse.class); - private static final String METADATA_DEFAULT_RESPONSE = "{\"PXFMetadata\":[]}"; - - private List<Metadata> metadataList; - - /** - * Constructs metadata response out of a metadata list - * - * @param metadataList metadata list - */ - public MetadataResponse(List<Metadata> metadataList) { - this.metadataList = metadataList; - } - - /** - * Serializes the metadata list in JSON, To be used as the result string for HAWQ. - */ - @Override - public void write(OutputStream output) throws IOException { - DataOutputStream dos = new DataOutputStream(output); - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(org.codehaus.jackson.map.SerializationConfig.Feature.USE_ANNOTATIONS, true); // enable annotations for serialization - mapper.setSerializationInclusion(Inclusion.NON_EMPTY); // ignore empty fields - - if(metadataList == null || metadataList.isEmpty()) { - dos.write(METADATA_DEFAULT_RESPONSE.getBytes()); - return; - } - - dos.write("{\"PXFMetadata\":[".getBytes()); - - String prefix = ""; - for (Metadata metadata : metadataList) { - if(metadata == null) { - throw new IllegalArgumentException("metadata object is null - cannot serialize"); - } - if ((metadata.getFields() == null) || metadata.getFields().isEmpty()) { - throw new IllegalArgumentException("metadata for " + metadata.getItem() + " contains no fields - cannot serialize"); - } - StringBuilder result = new StringBuilder(); - result.append(prefix).append(mapper.writeValueAsString(metadata)); - prefix = ","; - dos.write(result.toString().getBytes()); - } - - dos.write("]}".getBytes()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/30aecce5/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java deleted file mode 100644 index 8225ec5..0000000 --- a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java +++ /dev/null @@ -1,95 +0,0 @@ -package org.apache.hawq.pxf.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion; - -import org.apache.hawq.pxf.api.Metadata; - -/** - * Utility class for converting {@link Metadata} into a JSON format. - */ -public class MetadataResponseFormatter { - - private static final Log LOG = LogFactory.getLog(MetadataResponseFormatter.class); - - /** - * Converts list of {@link Metadata} to JSON String format. - * - * @param metadataList list of metadata objects to convert - * @param path path string - * @return JSON formatted response - * @throws IOException if converting the data to JSON fails - */ - public static MetadataResponse formatResponse(List<Metadata> metadataList, String path) throws IOException { - /* print the fragment list to log when in debug level */ - if (LOG.isDebugEnabled()) { - MetadataResponseFormatter.printMetadata(metadataList, path); - } - - return new MetadataResponse(metadataList); - } - - /** - * Converts metadata list to a readable string. - * Intended for debugging purposes only. - */ - private static void printMetadata(List<Metadata> metadataList, String path) { - LOG.debug("Metadata List for path " + path + ": "); - - if (null == metadataList || metadataList.isEmpty()) { - LOG.debug("No metadata"); - return; - } - - for(Metadata metadata: metadataList) { - StringBuilder result = new StringBuilder(); - - if (metadata == null) { - result.append("None"); - LOG.debug(result); - continue; - } - - result.append("Metadata for item \"").append(metadata.getItem()).append("\": "); - - if ((metadata.getFields() == null) || metadata.getFields().isEmpty()) { - result.append("None"); - } else { - int i = 0; - for (Metadata.Field field : metadata.getFields()) { - result.append("Field #").append(++i).append(": [") - .append("Name: ").append(field.getName()) - .append(", Type: ").append(field.getType().getTypeName()) - .append(", Source type: ").append(field.getSourceType()).append("] "); - } - } - LOG.debug(result); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/30aecce5/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java deleted file mode 100644 index 01a95ab..0000000 --- a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java +++ /dev/null @@ -1,179 +0,0 @@ -package org.apache.hawq.pxf.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.hawq.pxf.api.BadRecordException; -import org.apache.hawq.pxf.api.OneRow; -import org.apache.hawq.pxf.api.ReadAccessor; -import org.apache.hawq.pxf.api.ReadResolver; -import org.apache.hawq.pxf.api.utilities.InputData; -import org.apache.hawq.pxf.api.utilities.Plugin; -import org.apache.hawq.pxf.api.utilities.Utilities; -import org.apache.hawq.pxf.service.io.Writable; -import org.apache.hawq.pxf.service.utilities.ProtocolData; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.io.*; -import java.nio.charset.CharacterCodingException; -import java.util.LinkedList; -import java.util.zip.ZipException; - -/** - * ReadBridge class creates appropriate accessor and resolver. It will then - * create the correct output conversion class (e.g. Text or GPDBWritable) and - * get records from accessor, let resolver deserialize them and reserialize them - * using the output conversion class. <br> - * The class handles BadRecordException and other exception type and marks the - * record as invalid for HAWQ. - */ -public class ReadBridge implements Bridge { - ReadAccessor fileAccessor = null; - ReadResolver fieldsResolver = null; - BridgeOutputBuilder outputBuilder = null; - LinkedList<Writable> outputQueue = null; - - private static final Log LOG = LogFactory.getLog(ReadBridge.class); - - /** - * C'tor - set the implementation of the bridge. - * - * @param protData input containing accessor and resolver names - * @throws Exception if accessor or resolver can't be instantiated - */ - public ReadBridge(ProtocolData protData) throws Exception { - outputBuilder = new BridgeOutputBuilder(protData); - outputQueue = new LinkedList<Writable>(); - fileAccessor = getFileAccessor(protData); - fieldsResolver = getFieldsResolver(protData); - } - - /** - * Accesses the underlying HDFS file. - */ - @Override - public boolean beginIteration() throws Exception { - return fileAccessor.openForRead(); - } - - /** - * Fetches next object from file and turn it into a record that the HAWQ - * backend can process. - */ - @Override - public Writable getNext() throws Exception { - Writable output = null; - OneRow onerow = null; - - if (!outputQueue.isEmpty()) { - return outputQueue.pop(); - } - - try { - while (outputQueue.isEmpty()) { - onerow = fileAccessor.readNextObject(); - if (onerow == null) { - fileAccessor.closeForRead(); - output = outputBuilder.getPartialLine(); - if (output != null) { - LOG.warn("A partial record in the end of the fragment"); - } - // if there is a partial line, return it now, otherwise it - // will return null - return output; - } - - // we checked before that outputQueue is empty, so we can - // override it. - outputQueue = outputBuilder.makeOutput(fieldsResolver.getFields(onerow)); - if (!outputQueue.isEmpty()) { - output = outputQueue.pop(); - break; - } - } - } catch (IOException ex) { - if (!isDataException(ex)) { - fileAccessor.closeForRead(); - throw ex; - } - output = outputBuilder.getErrorOutput(ex); - } catch (BadRecordException ex) { - String row_info = "null"; - if (onerow != null) { - row_info = onerow.toString(); - } - if (ex.getCause() != null) { - LOG.debug("BadRecordException " + ex.getCause().toString() - + ": " + row_info); - } else { - LOG.debug(ex.toString() + ": " + row_info); - } - output = outputBuilder.getErrorOutput(ex); - } catch (Exception ex) { - fileAccessor.closeForRead(); - throw ex; - } - - return output; - } - - public static ReadAccessor getFileAccessor(InputData inputData) - throws Exception { - return (ReadAccessor) Utilities.createAnyInstance(InputData.class, - inputData.getAccessor(), inputData); - } - - public static ReadResolver getFieldsResolver(InputData inputData) - throws Exception { - return (ReadResolver) Utilities.createAnyInstance(InputData.class, - inputData.getResolver(), inputData); - } - - /* - * There are many exceptions that inherit IOException. Some of them like - * EOFException are generated due to a data problem, and not because of an - * IO/connection problem as the father IOException might lead us to believe. - * For example, an EOFException will be thrown while fetching a record from - * a sequence file, if there is a formatting problem in the record. Fetching - * record from the sequence-file is the responsibility of the accessor so - * the exception will be thrown from the accessor. We identify this cases by - * analyzing the exception type, and when we discover that the actual - * problem was a data problem, we return the errorOutput GPDBWritable. - */ - private boolean isDataException(IOException ex) { - return (ex instanceof EOFException - || ex instanceof CharacterCodingException - || ex instanceof CharConversionException - || ex instanceof UTFDataFormatException || ex instanceof ZipException); - } - - @Override - public boolean setNext(DataInputStream inputStream) { - throw new UnsupportedOperationException("setNext is not implemented"); - } - - @Override - public boolean isThreadSafe() { - boolean result = ((Plugin) fileAccessor).isThreadSafe() - && ((Plugin) fieldsResolver).isThreadSafe(); - LOG.debug("Bridge is " + (result ? "" : "not ") + "thread safe"); - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/30aecce5/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java deleted file mode 100644 index d5ae66a..0000000 --- a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java +++ /dev/null @@ -1,131 +0,0 @@ -package org.apache.hawq.pxf.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.DataInputStream; -import java.util.BitSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hawq.pxf.service.io.Writable; -import org.apache.hawq.pxf.service.utilities.AnalyzeUtils; -import org.apache.hawq.pxf.service.utilities.ProtocolData; - -/** - * ReadSamplingBridge wraps a ReadBridge, and returns only some of the output - * records, based on a ratio sample. The sample to pass or discard a record is - * done after all of the processing is completed ( - * {@code accessor -> resolver -> output builder}) to make sure there are no - * chunks of data instead of single records. <br> - * The goal is to get as uniform as possible sampling. This is achieved by - * creating a bit map matching the precision of the sampleRatio, so that for a - * ratio of 0.034, a bit-map of 1000 bits will be created, and 34 bits will be - * set. This map is matched against each read record, discarding ones with a 0 - * bit and continuing until a 1 bit record is read. - */ -public class ReadSamplingBridge implements Bridge { - - ReadBridge bridge; - - float sampleRatio; - BitSet sampleBitSet; - int bitSetSize; - int sampleSize; - int curIndex; - - private static final Log LOG = LogFactory.getLog(ReadSamplingBridge.class); - - /** - * C'tor - set the implementation of the bridge. - * - * @param protData input containing sampling ratio - * @throws Exception if the sampling ratio is wrong - */ - public ReadSamplingBridge(ProtocolData protData) throws Exception { - bridge = new ReadBridge(protData); - - this.sampleRatio = protData.getStatsSampleRatio(); - if (sampleRatio < 0.0001 || sampleRatio > 1.0) { - throw new IllegalArgumentException( - "sampling ratio must be a value between 0.0001 and 1.0. " - + "(value = " + sampleRatio + ")"); - } - - calculateBitSetSize(); - - this.sampleBitSet = AnalyzeUtils.generateSamplingBitSet(bitSetSize, - sampleSize); - this.curIndex = 0; - } - - private void calculateBitSetSize() { - - sampleSize = (int) (sampleRatio * 10000); - bitSetSize = 10000; - - while ((bitSetSize > 100) && (sampleSize % 10 == 0)) { - bitSetSize /= 10; - sampleSize /= 10; - } - LOG.debug("bit set size = " + bitSetSize + " sample size = " - + sampleSize); - } - - /** - * Fetches next sample, according to the sampling ratio. - */ - @Override - public Writable getNext() throws Exception { - Writable output = bridge.getNext(); - - // sample - if bit is false, advance to the next object - while (!sampleBitSet.get(curIndex)) { - - if (output == null) { - break; - } - incIndex(); - output = bridge.getNext(); - } - - incIndex(); - return output; - } - - private void incIndex() { - curIndex = (++curIndex) % bitSetSize; - } - - @Override - public boolean beginIteration() throws Exception { - return bridge.beginIteration(); - } - - @Override - public boolean setNext(DataInputStream inputStream) throws Exception { - return bridge.setNext(inputStream); - } - - @Override - public boolean isThreadSafe() { - return bridge.isThreadSafe(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/30aecce5/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java deleted file mode 100644 index c3ee731..0000000 --- a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java +++ /dev/null @@ -1,117 +0,0 @@ -package org.apache.hawq.pxf.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import org.apache.hawq.pxf.api.*; -import org.apache.hawq.pxf.api.utilities.InputData; -import org.apache.hawq.pxf.api.utilities.Plugin; -import org.apache.hawq.pxf.api.utilities.Utilities; -import org.apache.hawq.pxf.service.io.Writable; -import org.apache.hawq.pxf.service.utilities.ProtocolData; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.io.DataInputStream; -import java.util.List; - -/* - * WriteBridge class creates appropriate accessor and resolver. - * It reads data from inputStream by the resolver, - * and writes it to the Hadoop storage with the accessor. - */ -public class WriteBridge implements Bridge { - private static final Log LOG = LogFactory.getLog(WriteBridge.class); - WriteAccessor fileAccessor = null; - WriteResolver fieldsResolver = null; - BridgeInputBuilder inputBuilder; - - /* - * C'tor - set the implementation of the bridge - */ - public WriteBridge(ProtocolData protocolData) throws Exception { - - inputBuilder = new BridgeInputBuilder(protocolData); - /* plugins accept InputData parameters */ - fileAccessor = getFileAccessor(protocolData); - fieldsResolver = getFieldsResolver(protocolData); - - } - - /* - * Accesses the underlying HDFS file - */ - @Override - public boolean beginIteration() throws Exception { - return fileAccessor.openForWrite(); - } - - /* - * Read data from stream, convert it using WriteResolver into OneRow object, and - * pass to WriteAccessor to write into file. - */ - @Override - public boolean setNext(DataInputStream inputStream) throws Exception { - - List<OneField> record = inputBuilder.makeInput(inputStream); - if (record == null) { - close(); - return false; - } - - OneRow onerow = fieldsResolver.setFields(record); - if (onerow == null) { - close(); - return false; - } - if (!fileAccessor.writeNextObject(onerow)) { - close(); - throw new BadRecordException(); - } - return true; - } - - private void close() throws Exception { - try { - fileAccessor.closeForWrite(); - } catch (Exception e) { - LOG.error("Failed to close bridge resources: " + e.getMessage()); - throw e; - } - } - - private static WriteAccessor getFileAccessor(InputData inputData) throws Exception { - return (WriteAccessor) Utilities.createAnyInstance(InputData.class, inputData.getAccessor(), inputData); - } - - private static WriteResolver getFieldsResolver(InputData inputData) throws Exception { - return (WriteResolver) Utilities.createAnyInstance(InputData.class, inputData.getResolver(), inputData); - } - - @Override - public Writable getNext() { - throw new UnsupportedOperationException("getNext is not implemented"); - } - - @Override - public boolean isThreadSafe() { - return ((Plugin) fileAccessor).isThreadSafe() && ((Plugin) fieldsResolver).isThreadSafe(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/30aecce5/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java deleted file mode 100644 index 6b911f2..0000000 --- a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java +++ /dev/null @@ -1,98 +0,0 @@ -package org.apache.hawq.pxf.service.io; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.lang.UnsupportedOperationException; - -/** - * A serializable object for transporting a byte array through the Bridge - * framework - */ -public class BufferWritable implements Writable { - - byte[] buf = null; - - /** - * Constructs a BufferWritable. Copies the buffer reference and not the - * actual bytes. This class is used when we intend to transport a buffer - * through the Bridge framework without copying the data each time the - * buffer is passed between the Bridge objects. - * - * @param inBuf buffer - */ - public BufferWritable(byte[] inBuf) { - buf = inBuf; - } - - /** - * Serializes the fields of this object to <code>out</code>. - * - * @param out <code>DataOutput</code> to serialize this object into. - * @throws IOException if the buffer was not set - */ - @Override - public void write(DataOutput out) throws IOException { - if (buf == null) - throw new IOException("BufferWritable was not set"); - out.write(buf); - } - - /** - * Deserializes the fields of this object from <code>in</code>. - * <p> - * For efficiency, implementations should attempt to re-use storage in the - * existing object where possible. - * </p> - * - * @param in <code>DataInput</code> to deserialize this object from - * @throws UnsupportedOperationException this function is not supported - */ - @Override - public void readFields(DataInput in) { - throw new UnsupportedOperationException( - "BufferWritable.readFields() is not implemented"); - } - - /** - * Appends given app's buffer to existing buffer. - * <br> - * Not efficient - requires copying both this and the appended buffer. - * - * @param app buffer to append - */ - public void append(byte[] app) { - if (buf == null) { - buf = app; - return; - } - if (app == null) { - return; - } - - byte[] newbuf = new byte[buf.length + app.length]; - System.arraycopy(buf, 0, newbuf, 0, buf.length); - System.arraycopy(app, 0, newbuf, buf.length, app.length); - buf = newbuf; - } -}
