http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java new file mode 100644 index 0000000..6b911f2 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java @@ -0,0 +1,98 @@ +package org.apache.hawq.pxf.service.io; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.UnsupportedOperationException; + +/** + * A serializable object for transporting a byte array through the Bridge + * framework + */ +public class BufferWritable implements Writable { + + byte[] buf = null; + + /** + * Constructs a BufferWritable. Copies the buffer reference and not the + * actual bytes. This class is used when we intend to transport a buffer + * through the Bridge framework without copying the data each time the + * buffer is passed between the Bridge objects. + * + * @param inBuf buffer + */ + public BufferWritable(byte[] inBuf) { + buf = inBuf; + } + + /** + * Serializes the fields of this object to <code>out</code>. + * + * @param out <code>DataOutput</code> to serialize this object into. + * @throws IOException if the buffer was not set + */ + @Override + public void write(DataOutput out) throws IOException { + if (buf == null) + throw new IOException("BufferWritable was not set"); + out.write(buf); + } + + /** + * Deserializes the fields of this object from <code>in</code>. + * <p> + * For efficiency, implementations should attempt to re-use storage in the + * existing object where possible. + * </p> + * + * @param in <code>DataInput</code> to deserialize this object from + * @throws UnsupportedOperationException this function is not supported + */ + @Override + public void readFields(DataInput in) { + throw new UnsupportedOperationException( + "BufferWritable.readFields() is not implemented"); + } + + /** + * Appends given app's buffer to existing buffer. + * <br> + * Not efficient - requires copying both this and the appended buffer. + * + * @param app buffer to append + */ + public void append(byte[] app) { + if (buf == null) { + buf = app; + return; + } + if (app == null) { + return; + } + + byte[] newbuf = new byte[buf.length + app.length]; + System.arraycopy(buf, 0, newbuf, 0, buf.length); + System.arraycopy(app, 0, newbuf, buf.length, app.length); + buf = newbuf; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/GPDBWritable.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/GPDBWritable.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/GPDBWritable.java new file mode 100644 index 0000000..5bc26f1 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/GPDBWritable.java @@ -0,0 +1,893 @@ +package org.apache.hawq.pxf.service.io; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.*; +import java.util.Arrays; + +import static org.apache.hawq.pxf.api.io.DataType.*; + + +/** + * This class represents a GPDB record in the form of + * a Java object. + */ +public class GPDBWritable implements Writable { + /* + * GPDBWritable is using the following serialization form: + * Total Length | Version | Error Flag | # of columns | Col type |...| Col type | Null Bit array | Col val... + * 4 byte | 2 byte | 1 byte | 2 byte | 1 byte |...| 1 byte | ceil(# of columns/8) byte | Fixed or Var length + * + * For fixed length type, we know the length. + * In the col val, we align pad according to the alignment requirement of the type. + * For var length type, the alignment is always 4 byte. + * For var length type, col val is <4 byte length><payload val> + */ + + private static final Log LOG = LogFactory.getLog(GPDBWritable.class); + private static final int EOF = -1; + + /* + * Enum of the Database type + */ + private enum DBType { + BIGINT(8, 8), + BOOLEAN(1, 1), + FLOAT8(8, 8), + INTEGER(4, 4), + REAL(4, 4), + SMALLINT(2, 2), + BYTEA(4, -1), + TEXT(4, -1); + + private final int typelength; // -1 means var length + private final int alignment; + + DBType(int align, int len) { + this.typelength = len; + this.alignment = align; + } + + public int getTypeLength() { + return typelength; + } + + public boolean isVarLength() { + return typelength == -1; + } + + // return the alignment requirement of the type + public int getAlignment() { + return alignment; + } + } + + /* + * Constants + */ + private static final int PREV_VERSION = 1; + private static final int VERSION = 2; /* for backward compatibility */ + private static final String CHARSET = "UTF-8"; + + /* + * Local variables + */ + protected int[] colType; + protected Object[] colValue; + protected int alignmentOfEightBytes = 8; + protected byte errorFlag = 0; + protected int pktlen = EOF; + + public int[] getColType() { + return colType; + } + + /** + * An exception class for column type definition and + * set/get value mismatch. + */ + public class TypeMismatchException extends IOException { + public TypeMismatchException(String msg) { + super(msg); + } + } + + /** + * Empty Constructor + */ + public GPDBWritable() { + initializeEightByteAlignment(); + } + + /** + * Constructor to build a db record. colType defines the schema + * + * @param columnType the table column types + */ + public GPDBWritable(int[] columnType) { + initializeEightByteAlignment(); + colType = columnType; + colValue = new Object[columnType.length]; + } + + /** + * Constructor to build a db record from a serialized form. + * + * @param data a record in the serialized form + * @throws IOException if the data is malformatted. + */ + public GPDBWritable(byte[] data) throws IOException { + initializeEightByteAlignment(); + ByteArrayInputStream bis = new ByteArrayInputStream(data); + DataInputStream dis = new DataInputStream(bis); + + readFields(dis); + } + + /* + * Read first 4 bytes, and verify it's a valid packet length. + * Upon error returns EOF. + */ + private int readPktLen(DataInput in) throws IOException { + pktlen = EOF; + + try { + pktlen = in.readInt(); + } catch (EOFException e) { + LOG.debug("Reached end of stream (EOFException)"); + return EOF; + } + if (pktlen == EOF) { + LOG.debug("Reached end of stream (returned -1)"); + } + + return pktlen; + } + + @Override + public void readFields(DataInput in) throws IOException { + /* + * extract pkt len. + * + * GPSQL-1107: + * The DataInput might already be empty (EOF), but we can't check it beforehand. + * If that's the case, pktlen is updated to -1, to mark that the object is still empty. + * (can be checked with isEmpty()). + */ + pktlen = readPktLen(in); + if (isEmpty()) { + return; + } + + /* extract the version and col cnt */ + int version = in.readShort(); + int curOffset = 4 + 2; + int colCnt; + + /* !!! Check VERSION !!! */ + if (version != GPDBWritable.VERSION && version != GPDBWritable.PREV_VERSION) { + throw new IOException("Current GPDBWritable version(" + + GPDBWritable.VERSION + ") does not match input version(" + + version + ")"); + } + + if (version == GPDBWritable.VERSION) { + errorFlag = in.readByte(); + curOffset += 1; + } + + colCnt = in.readShort(); + curOffset += 2; + + /* Extract Column Type */ + colType = new int[colCnt]; + DBType[] coldbtype = new DBType[colCnt]; + for (int i = 0; i < colCnt; i++) { + int enumType = (in.readByte()); + curOffset += 1; + if (enumType == DBType.BIGINT.ordinal()) { + colType[i] = BIGINT.getOID(); + coldbtype[i] = DBType.BIGINT; + } else if (enumType == DBType.BOOLEAN.ordinal()) { + colType[i] = BOOLEAN.getOID(); + coldbtype[i] = DBType.BOOLEAN; + } else if (enumType == DBType.FLOAT8.ordinal()) { + colType[i] = FLOAT8.getOID(); + coldbtype[i] = DBType.FLOAT8; + } else if (enumType == DBType.INTEGER.ordinal()) { + colType[i] = INTEGER.getOID(); + coldbtype[i] = DBType.INTEGER; + } else if (enumType == DBType.REAL.ordinal()) { + colType[i] = REAL.getOID(); + coldbtype[i] = DBType.REAL; + } else if (enumType == DBType.SMALLINT.ordinal()) { + colType[i] = SMALLINT.getOID(); + coldbtype[i] = DBType.SMALLINT; + } else if (enumType == DBType.BYTEA.ordinal()) { + colType[i] = BYTEA.getOID(); + coldbtype[i] = DBType.BYTEA; + } else if (enumType == DBType.TEXT.ordinal()) { + colType[i] = TEXT.getOID(); + coldbtype[i] = DBType.TEXT; + } else { + throw new IOException("Unknown GPDBWritable.DBType ordinal value"); + } + } + + /* Extract null bit array */ + byte[] nullbytes = new byte[getNullByteArraySize(colCnt)]; + in.readFully(nullbytes); + curOffset += nullbytes.length; + boolean[] colIsNull = byteArrayToBooleanArray(nullbytes, colCnt); + + /* extract column value */ + colValue = new Object[colCnt]; + for (int i = 0; i < colCnt; i++) { + if (!colIsNull[i]) { + /* Skip the alignment padding */ + int skipbytes = roundUpAlignment(curOffset, coldbtype[i].getAlignment()) - curOffset; + for (int j = 0; j < skipbytes; j++) { + in.readByte(); + } + curOffset += skipbytes; + + /* For fixed length type, increment the offset according to type type length here. + * For var length type (BYTEA, TEXT), we'll read 4 byte length header and the + * actual payload. + */ + int varcollen = -1; + if (coldbtype[i].isVarLength()) { + varcollen = in.readInt(); + curOffset += 4 + varcollen; + } else { + curOffset += coldbtype[i].getTypeLength(); + } + + switch (DataType.get(colType[i])) { + case BIGINT: { + colValue[i] = in.readLong(); + break; + } + case BOOLEAN: { + colValue[i] = in.readBoolean(); + break; + } + case FLOAT8: { + colValue[i] = in.readDouble(); + break; + } + case INTEGER: { + colValue[i] = in.readInt(); + break; + } + case REAL: { + colValue[i] = in.readFloat(); + break; + } + case SMALLINT: { + colValue[i] = in.readShort(); + break; + } + + /* For BYTEA column, it has a 4 byte var length header. */ + case BYTEA: { + colValue[i] = new byte[varcollen]; + in.readFully((byte[]) colValue[i]); + break; + } + /* For text formatted column, it has a 4 byte var length header + * and it's always null terminated string. + * So, we can remove the last "\0" when constructing the string. + */ + case TEXT: { + byte[] data = new byte[varcollen]; + in.readFully(data, 0, varcollen); + colValue[i] = new String(data, 0, varcollen - 1, CHARSET); + break; + } + + default: + throw new IOException("Unknown GPDBWritable ColType"); + } + } + } + + /* Skip the ending alignment padding */ + int skipbytes = roundUpAlignment(curOffset, 8) - curOffset; + for (int j = 0; j < skipbytes; j++) { + in.readByte(); + } + curOffset += skipbytes; + + if (errorFlag != 0) { + throw new IOException("Received error value " + errorFlag + " from format"); + } + } + + @Override + public void write(DataOutput out) throws IOException { + int numCol = colType.length; + boolean[] nullBits = new boolean[numCol]; + int[] colLength = new int[numCol]; + byte[] enumType = new byte[numCol]; + int[] padLength = new int[numCol]; + byte[] padbytes = new byte[8]; + + /** + * Compute the total payload and header length + * header = total length (4 byte), Version (2 byte), Error (1 byte), #col (2 byte) + * col type array = #col * 1 byte + * null bit array = ceil(#col/8) + */ + int datlen = 4 + 2 + 1 + 2; + datlen += numCol; + datlen += getNullByteArraySize(numCol); + + for (int i = 0; i < numCol; i++) { + /* Get the enum type */ + DBType coldbtype; + switch (DataType.get(colType[i])) { + case BIGINT: + coldbtype = DBType.BIGINT; + break; + case BOOLEAN: + coldbtype = DBType.BOOLEAN; + break; + case FLOAT8: + coldbtype = DBType.FLOAT8; + break; + case INTEGER: + coldbtype = DBType.INTEGER; + break; + case REAL: + coldbtype = DBType.REAL; + break; + case SMALLINT: + coldbtype = DBType.SMALLINT; + break; + case BYTEA: + coldbtype = DBType.BYTEA; + break; + default: + coldbtype = DBType.TEXT; + } + enumType[i] = (byte) (coldbtype.ordinal()); + + /* Get the actual value, and set the null bit */ + if (colValue[i] == null) { + nullBits[i] = true; + colLength[i] = 0; + } else { + nullBits[i] = false; + + /* + * For fixed length type, we get the fixed length. + * For var len binary format, the length is in the col value. + * For text format, we must convert encoding first. + */ + if (!coldbtype.isVarLength()) { + colLength[i] = coldbtype.getTypeLength(); + } else if (!isTextForm(colType[i])) { + colLength[i] = ((byte[]) colValue[i]).length; + } else { + colLength[i] = ((String) colValue[i]).getBytes(CHARSET).length; + } + + /* calculate and add the type alignment padding */ + padLength[i] = roundUpAlignment(datlen, coldbtype.getAlignment()) - datlen; + datlen += padLength[i]; + + /* for variable length type, we add a 4 byte length header */ + if (coldbtype.isVarLength()) { + datlen += 4; + } + } + datlen += colLength[i]; + } + + /* + * Add the final alignment padding for the next record + */ + int endpadding = roundUpAlignment(datlen, 8) - datlen; + datlen += endpadding; + + /* Construct the packet header */ + out.writeInt(datlen); + out.writeShort(VERSION); + out.writeByte(errorFlag); + out.writeShort(numCol); + + /* Write col type */ + for (int i = 0; i < numCol; i++) { + out.writeByte(enumType[i]); + } + + /* Nullness */ + byte[] nullBytes = boolArrayToByteArray(nullBits); + out.write(nullBytes); + + /* Column Value */ + for (int i = 0; i < numCol; i++) { + if (!nullBits[i]) { + /* Pad the alignment byte first */ + if (padLength[i] > 0) { + out.write(padbytes, 0, padLength[i]); + } + + /* Now, write the actual column value */ + switch (DataType.get(colType[i])) { + case BIGINT: + out.writeLong(((Long) colValue[i])); + break; + case BOOLEAN: + out.writeBoolean(((Boolean) colValue[i])); + break; + case FLOAT8: + out.writeDouble(((Double) colValue[i])); + break; + case INTEGER: + out.writeInt(((Integer) colValue[i])); + break; + case REAL: + out.writeFloat(((Float) colValue[i])); + break; + case SMALLINT: + out.writeShort(((Short) colValue[i])); + break; + + /* For BYTEA format, add 4byte length header at the beginning */ + case BYTEA: + out.writeInt(colLength[i]); + out.write((byte[]) colValue[i]); + break; + + /* For text format, add 4byte length header. string is already '\0' terminated */ + default: { + out.writeInt(colLength[i]); + byte[] data = ((String) colValue[i]).getBytes(CHARSET); + out.write(data); + break; + } + } + } + } + + /* End padding */ + out.write(padbytes, 0, endpadding); + } + + /** + * Private helper to convert boolean array to byte array + */ + private static byte[] boolArrayToByteArray(boolean[] data) { + int len = data.length; + byte[] byts = new byte[getNullByteArraySize(len)]; + + for (int i = 0, j = 0, k = 7; i < data.length; i++) { + byts[j] |= (data[i] ? 1 : 0) << k--; + if (k < 0) { + j++; + k = 7; + } + } + return byts; + } + + /** + * Private helper to determine the size of the null byte array + */ + private static int getNullByteArraySize(int colCnt) { + return (colCnt / 8) + (colCnt % 8 != 0 ? 1 : 0); + } + + /** + * Private helper to convert byte array to boolean array + */ + private static boolean[] byteArrayToBooleanArray(byte[] data, int colCnt) { + boolean[] bools = new boolean[colCnt]; + for (int i = 0, j = 0, k = 7; i < bools.length; i++) { + bools[i] = ((data[j] >> k--) & 0x01) == 1; + if (k < 0) { + j++; + k = 7; + } + } + return bools; + } + + /** + * Private helper to round up alignment for the given length + */ + private int roundUpAlignment(int len, int align) { + int commonAlignment = align; + if (commonAlignment == 8) { + commonAlignment = alignmentOfEightBytes; + } + return (((len) + ((commonAlignment) - 1)) & ~((commonAlignment) - 1)); + } + + /** + * Getter/Setter methods to get/set the column value + */ + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setLong(int colIdx, Long val) + throws TypeMismatchException { + checkType(BIGINT, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setBoolean(int colIdx, Boolean val) + throws TypeMismatchException { + checkType(BOOLEAN, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setBytes(int colIdx, byte[] val) + throws TypeMismatchException { + checkType(BYTEA, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setString(int colIdx, String val) + throws TypeMismatchException { + checkType(TEXT, colIdx, true); + if (val != null) { + colValue[colIdx] = val + "\0"; + } else { + colValue[colIdx] = val; + } + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setFloat(int colIdx, Float val) + throws TypeMismatchException { + checkType(REAL, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setDouble(int colIdx, Double val) + throws TypeMismatchException { + checkType(FLOAT8, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setInt(int colIdx, Integer val) + throws TypeMismatchException { + checkType(INTEGER, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setShort(int colIdx, Short val) + throws TypeMismatchException { + checkType(SMALLINT, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Long getLong(int colIdx) + throws TypeMismatchException { + checkType(BIGINT, colIdx, false); + return (Long) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Boolean getBoolean(int colIdx) + throws TypeMismatchException { + checkType(BOOLEAN, colIdx, false); + return (Boolean) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public byte[] getBytes(int colIdx) + throws TypeMismatchException { + checkType(BYTEA, colIdx, false); + return (byte[]) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public String getString(int colIdx) + throws TypeMismatchException { + checkType(TEXT, colIdx, false); + return (String) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Float getFloat(int colIdx) + throws TypeMismatchException { + checkType(REAL, colIdx, false); + return (Float) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Double getDouble(int colIdx) + throws TypeMismatchException { + checkType(FLOAT8, colIdx, false); + return (Double) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Integer getInt(int colIdx) + throws TypeMismatchException { + checkType(INTEGER, colIdx, false); + return (Integer) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Short getShort(int colIdx) + throws TypeMismatchException { + checkType(SMALLINT, colIdx, false); + return (Short) colValue[colIdx]; + } + + /** + * Sets the error field. + * + * @param errorVal the error value + */ + public void setError(boolean errorVal) { + errorFlag = errorVal ? (byte) 1 : (byte) 0; + } + + /** + * Returns a string representation of the object. + */ + @Override + public String toString() { + if (colType == null) { + return null; + } + StringBuilder result = new StringBuilder(); + for (int i = 0; i < colType.length; i++) { + result.append("Column ").append(i).append(":"); + if (colValue[i] != null) { + result.append(colType[i] == BYTEA.getOID() + ? byteArrayInString((byte[]) colValue[i]) + : colValue[i]); + } + result.append("\n"); + } + return result.toString(); + } + + /** + * Helper printing function + */ + private static String byteArrayInString(byte[] data) { + StringBuilder result = new StringBuilder(); + for (Byte b : data) { + result.append(b.intValue()).append(" "); + } + return result.toString(); + } + + /** + * Private Helper to check the type mismatch + * If the expected type is stored as string, then it must be set + * via setString. + * Otherwise, the type must match. + */ + private void checkType(DataType inTyp, int idx, boolean isSet) + throws TypeMismatchException { + if (idx < 0 || idx >= colType.length) { + throw new TypeMismatchException("Column index is out of range"); + } + + int exTyp = colType[idx]; + + if (isTextForm(exTyp)) { + if (inTyp != TEXT) { + throw new TypeMismatchException(formErrorMsg(inTyp.getOID(), TEXT.getOID(), isSet)); + } + } else if (inTyp != DataType.get(exTyp)) { + throw new TypeMismatchException(formErrorMsg(inTyp.getOID(), exTyp, isSet)); + } + } + + private String formErrorMsg(int inTyp, int colTyp, boolean isSet) { + return isSet + ? "Cannot set " + getTypeName(inTyp) + " to a " + getTypeName(colTyp) + " column" + : "Cannot get " + getTypeName(inTyp) + " from a " + getTypeName(colTyp) + " column"; + } + + /** + * Private Helper routine to tell whether a type is Text form or not + * + * @param type the type OID that we want to check + */ + private boolean isTextForm(int type) { + return !Arrays.asList(BIGINT, BOOLEAN, BYTEA, FLOAT8, INTEGER, REAL, SMALLINT).contains(DataType.get(type)); + } + + /** + * Helper to get the type name. + * If a given oid is not in the commonly used list, we + * would expect a TEXT for it (for the error message). + * + * @param oid type OID + * @return type name + */ + public static String getTypeName(int oid) { + switch (DataType.get(oid)) { + case BOOLEAN: + return "BOOLEAN"; + case BYTEA: + return "BYTEA"; + case CHAR: + return "CHAR"; + case BIGINT: + return "BIGINT"; + case SMALLINT: + return "SMALLINT"; + case INTEGER: + return "INTEGER"; + case TEXT: + return "TEXT"; + case REAL: + return "REAL"; + case FLOAT8: + return "FLOAT8"; + case BPCHAR: + return "BPCHAR"; + case VARCHAR: + return "VARCHAR"; + case DATE: + return "DATE"; + case TIME: + return "TIME"; + case TIMESTAMP: + return "TIMESTAMP"; + case NUMERIC: + return "NUMERIC"; + default: + return "TEXT"; + } + } + + /* + * Get alignment from command line to match to the alignment + * the C code uses (see gphdfs/src/protocol_formatter/common.c). + */ + private void initializeEightByteAlignment() { + String alignment = System.getProperty("greenplum.alignment"); + if (alignment == null) { + return; + } + alignmentOfEightBytes = Integer.parseInt(alignment); + } + + /** + * Returns if the writable object is empty, + * based on the pkt len as read from stream. + * -1 means nothing was read (eof). + * + * @return whether the writable object is empty + */ + public boolean isEmpty() { + return pktlen == EOF; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Text.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Text.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Text.java new file mode 100644 index 0000000..253b525 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Text.java @@ -0,0 +1,399 @@ +package org.apache.hawq.pxf.service.io; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.*; +import java.util.Arrays; + +/** + * This class stores text using standard UTF8 encoding. It provides methods to + * serialize, deserialize. The type of length is integer and is serialized using + * zero-compressed format. + */ +public class Text implements Writable { + + // for write + private byte[] buf; + private static final Log LOG = LogFactory.getLog(Text.class); + int curLoc; + private static final char LINE_DELIMITER = '\n'; + private static final int BUF_SIZE = 1024; + private static final int EOF = -1; + + private static final byte[] EMPTY_BYTES = new byte[0]; + private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal<CharsetEncoder>() { + @Override + protected CharsetEncoder initialValue() { + return Charset.forName("UTF-8").newEncoder().onMalformedInput( + CodingErrorAction.REPORT).onUnmappableCharacter( + CodingErrorAction.REPORT); + } + }; + private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal<CharsetDecoder>() { + @Override + protected CharsetDecoder initialValue() { + return Charset.forName("UTF-8").newDecoder().onMalformedInput( + CodingErrorAction.REPORT).onUnmappableCharacter( + CodingErrorAction.REPORT); + } + }; + private byte[] bytes; + private int length; + + public Text() { + bytes = EMPTY_BYTES; + buf = new byte[BUF_SIZE]; + } + + /** + * Construct from a string. + * + * @param string input string + */ + public Text(String string) { + set(string); + } + + /** + * Construct from another text. + * + * @param utf8 text to copy + */ + public Text(Text utf8) { + set(utf8); + } + + /** + * Construct from a byte array. + * + * @param utf8 input byte array + */ + public Text(byte[] utf8) { + set(utf8); + } + + public static boolean isNegativeVInt(byte value) { + return value < -120 || (value >= -112 && value < 0); + } + + public static long readVLong(DataInput stream) throws IOException { + byte firstByte = stream.readByte(); + int len = decodeVIntSize(firstByte); + if (len == 1) { + return firstByte; + } + long i = 0; + for (int idx = 0; idx < len - 1; idx++) { + byte b = stream.readByte(); + i = i << 8; + i = i | (b & 0xFF); + } + return (isNegativeVInt(firstByte) ? (i ^ -1L) : i); + } + + public static int decodeVIntSize(byte value) { + if (value >= -112) { + return 1; + } else if (value < -120) { + return -119 - value; + } + return -111 - value; + } + + public static String decode(byte[] utf8, int start, int length) + throws CharacterCodingException { + return decode(ByteBuffer.wrap(utf8, start, length), true); + } + + /** + * Converts the provided byte array to a String using the UTF-8 encoding. If + * <code>replace</code> is true, then malformed input is replaced with the + * substitution character, which is U+FFFD. Otherwise the method throws a + * MalformedInputException. + * + * @param utf8 UTF-8 encoded byte array + * @param start start point + * @param length length of array + * @param replace whether to replace malformed input with substitution + * character + * @return decoded string + * @throws MalformedInputException if a malformed input is used + * @throws CharacterCodingException if the conversion failed + */ + public static String decode(byte[] utf8, int start, int length, + boolean replace) + throws CharacterCodingException { + return decode(ByteBuffer.wrap(utf8, start, length), replace); + } + + private static String decode(ByteBuffer utf8, boolean replace) + throws CharacterCodingException { + CharsetDecoder decoder = DECODER_FACTORY.get(); + if (replace) { + decoder.onMalformedInput(java.nio.charset.CodingErrorAction.REPLACE); + decoder.onUnmappableCharacter(CodingErrorAction.REPLACE); + } + String str = decoder.decode(utf8).toString(); + // set decoder back to its default value: REPORT + if (replace) { + decoder.onMalformedInput(CodingErrorAction.REPORT); + decoder.onUnmappableCharacter(CodingErrorAction.REPORT); + } + return str; + } + + /** + * Converts the provided String to bytes using the UTF-8 encoding. If the + * input is malformed, invalid chars are replaced by a default value. + * + * @param string string to encode + * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is + * ByteBuffer.limit() + * @throws CharacterCodingException if conversion failed + */ + public static ByteBuffer encode(String string) + throws CharacterCodingException { + return encode(string, true); + } + + /** + * Converts the provided String to bytes using the UTF-8 encoding. If + * <code>replace</code> is true, then malformed input is replaced with the + * substitution character, which is U+FFFD. Otherwise the method throws a + * MalformedInputException. + * + * @param string string to encode + * @param replace whether to replace malformed input with substitution + * character + * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is + * ByteBuffer.limit() + * @throws MalformedInputException if a malformed input is used + * @throws CharacterCodingException if the conversion failed + */ + public static ByteBuffer encode(String string, boolean replace) + throws CharacterCodingException { + CharsetEncoder encoder = ENCODER_FACTORY.get(); + if (replace) { + encoder.onMalformedInput(CodingErrorAction.REPLACE); + encoder.onUnmappableCharacter(CodingErrorAction.REPLACE); + } + ByteBuffer bytes = encoder.encode(CharBuffer.wrap(string.toCharArray())); + if (replace) { + encoder.onMalformedInput(CodingErrorAction.REPORT); + encoder.onUnmappableCharacter(CodingErrorAction.REPORT); + } + return bytes; + } + + /** + * Returns the raw bytes; however, only data up to {@link #getLength()} is + * valid. + * + * @return raw bytes of byte array + */ + public byte[] getBytes() { + return bytes; + } + + /** + * Returns the number of bytes in the byte array + * + * @return number of bytes in byte array + */ + public int getLength() { + return length; + } + + /** + * Sets to contain the contents of a string. + * + * @param string input string + */ + public void set(String string) { + try { + ByteBuffer bb = encode(string, true); + bytes = bb.array(); + length = bb.limit(); + } catch (CharacterCodingException e) { + throw new RuntimeException("Should not have happened " + + e.toString()); + } + } + + /** + * Sets to a UTF-8 byte array. + * + * @param utf8 input UTF-8 byte array + */ + public void set(byte[] utf8) { + set(utf8, 0, utf8.length); + } + + /** + * Copies a text. + * + * @param other text object to copy. + */ + public void set(Text other) { + set(other.getBytes(), 0, other.getLength()); + } + + /** + * Sets the Text to range of bytes. + * + * @param utf8 the data to copy from + * @param start the first position of the new string + * @param len the number of bytes of the new string + */ + public void set(byte[] utf8, int start, int len) { + setCapacity(len, false); + System.arraycopy(utf8, start, bytes, 0, len); + this.length = len; + } + + /** + * Appends a range of bytes to the end of the given text. + * + * @param utf8 the data to copy from + * @param start the first position to append from utf8 + * @param len the number of bytes to append + */ + public void append(byte[] utf8, int start, int len) { + setCapacity(length + len, true); + System.arraycopy(utf8, start, bytes, length, len); + length += len; + } + + /** + * Clears the string to empty. + */ + public void clear() { + length = 0; + } + + /* + * Sets the capacity of this Text object to <em>at least</em> + * <code>len</code> bytes. If the current buffer is longer, then the + * capacity and existing content of the buffer are unchanged. If + * <code>len</code> is larger than the current capacity, the Text object's + * capacity is increased to match. + * + * @param len the number of bytes we need + * + * @param keepData should the old data be kept + */ + private void setCapacity(int len, boolean keepData) { + if (bytes == null || bytes.length < len) { + byte[] newBytes = new byte[len]; + if (bytes != null && keepData) { + System.arraycopy(bytes, 0, newBytes, 0, length); + } + bytes = newBytes; + } + } + + /** + * Convert text back to string + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + try { + return decode(bytes, 0, length); + } catch (CharacterCodingException e) { + throw new RuntimeException("Should not have happened " + + e.toString()); + } + } + + @Override + public void write(DataOutput out) throws IOException { + byte[] bytes = getBytes(); + out.write(bytes, 0, getLength()); + } + + /** + * deserialize + */ + @Override + public void readFields(DataInput inputStream) throws IOException { + + byte c; + curLoc = 0; + clear(); + while ((c = (byte) ((DataInputStream) inputStream).read()) != EOF) { + buf[curLoc] = c; + curLoc++; + + if (c == LINE_DELIMITER) { + LOG.trace("read one line, size " + curLoc); + break; + } + + if (isBufferFull()) { + flushBuffer(); + } + } + + if (!isBufferEmpty()) { + // the buffer doesn't end with a line break. + if (c == EOF) { + LOG.warn("Stream ended without line break"); + } + flushBuffer(); + } + } + + private boolean isBufferEmpty() { + return (curLoc == 0); + } + + private boolean isBufferFull() { + return (curLoc == BUF_SIZE); + } + + private void flushBuffer() { + append(buf, 0, curLoc); + curLoc = 0; + } + + /** + * Returns true iff <code>o</code> is a Text with the same contents. + */ + @Override + public boolean equals(Object o) { + return (o instanceof Text && Arrays.equals(bytes, ((Text) o).bytes)); + } + + @Override + public int hashCode() { + return Arrays.hashCode(bytes); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Writable.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Writable.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Writable.java new file mode 100644 index 0000000..038da9c --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Writable.java @@ -0,0 +1,50 @@ +package org.apache.hawq.pxf.service.io; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * A serializable object which implements a simple, efficient, serialization + * protocol, based on {@link DataInput} and {@link DataOutput}. + */ +public interface Writable { + + /** + * Serialize the fields of this object to <code>out</code>. + * + * @param out <code>DataOutput</code> to serialize this object into. + * @throws IOException if I/O error occurs + */ + void write(DataOutput out) throws IOException; + + /** + * Deserialize the fields of this object from <code>in</code>. + * <p>For efficiency, implementations should attempt to re-use storage in the + * existing object where possible.</p> + * + * @param in <code>DataInput</code> to deserialize this object from. + * @throws IOException if I/O error occurs + */ + void readFields(DataInput in) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/package-info.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/package-info.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/package-info.java new file mode 100644 index 0000000..7084f86 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Provides PXF Rest API resources. + */ +package org.apache.hawq.pxf.service; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/BridgeResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/BridgeResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/BridgeResource.java new file mode 100644 index 0000000..3a062c3 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/BridgeResource.java @@ -0,0 +1,189 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +import javax.servlet.ServletContext; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; + +import org.apache.catalina.connector.ClientAbortException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hawq.pxf.service.Bridge; +import org.apache.hawq.pxf.service.ReadBridge; +import org.apache.hawq.pxf.service.ReadSamplingBridge; +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.hawq.pxf.service.utilities.SecuredHDFS; + +/* + * This class handles the subpath /<version>/Bridge/ of this + * REST component + */ +@Path("/" + Version.PXF_PROTOCOL_VERSION + "/Bridge/") +public class BridgeResource extends RestResource { + + private static final Log LOG = LogFactory.getLog(BridgeResource.class); + /** + * Lock is needed here in the case of a non-thread-safe plugin. Using + * synchronized methods is not enough because the bridge work is called by + * jetty ({@link StreamingOutput}), after we are getting out of this class's + * context. + * <p/> + * BRIDGE_LOCK is accessed through lock() and unlock() functions, based on + * the isThreadSafe parameter that is determined by the bridge. + */ + private static final ReentrantLock BRIDGE_LOCK = new ReentrantLock(); + + public BridgeResource() { + } + + /** + * Used to be HDFSReader. Creates a bridge instance and iterates over its + * records, printing it out to outgoing stream. Outputs GPDBWritable or + * Text. + * + * Parameters come through HTTP header. + * + * @param servletContext Servlet context contains attributes required by + * SecuredHDFS + * @param headers Holds HTTP headers from request + * @return response object containing stream that will output records + * @throws Exception in case of wrong request parameters, or failure to + * initialize bridge + */ + @GET + @Produces(MediaType.APPLICATION_OCTET_STREAM) + public Response read(@Context final ServletContext servletContext, + @Context HttpHeaders headers) throws Exception { + // Convert headers into a regular map + Map<String, String> params = convertToCaseInsensitiveMap(headers.getRequestHeaders()); + + LOG.debug("started with parameters: " + params); + + ProtocolData protData = new ProtocolData(params); + SecuredHDFS.verifyToken(protData, servletContext); + Bridge bridge; + float sampleRatio = protData.getStatsSampleRatio(); + if (sampleRatio > 0) { + bridge = new ReadSamplingBridge(protData); + } else { + bridge = new ReadBridge(protData); + } + String dataDir = protData.getDataSource(); + // THREAD-SAFE parameter has precedence + boolean isThreadSafe = protData.isThreadSafe() && bridge.isThreadSafe(); + LOG.debug("Request for " + dataDir + " will be handled " + + (isThreadSafe ? "without" : "with") + " synchronization"); + + return readResponse(bridge, protData, isThreadSafe); + } + + Response readResponse(final Bridge bridge, ProtocolData protData, + final boolean threadSafe) { + final int fragment = protData.getDataFragment(); + final String dataDir = protData.getDataSource(); + + // Creating an internal streaming class + // which will iterate the records and put them on the + // output stream + final StreamingOutput streaming = new StreamingOutput() { + @Override + public void write(final OutputStream out) throws IOException, + WebApplicationException { + long recordCount = 0; + + if (!threadSafe) { + lock(dataDir); + } + try { + + if (!bridge.beginIteration()) { + return; + } + + Writable record; + DataOutputStream dos = new DataOutputStream(out); + LOG.debug("Starting streaming fragment " + fragment + + " of resource " + dataDir); + while ((record = bridge.getNext()) != null) { + record.write(dos); + ++recordCount; + } + LOG.debug("Finished streaming fragment " + fragment + + " of resource " + dataDir + ", " + recordCount + + " records."); + } catch (ClientAbortException e) { + // Occurs whenever client (HAWQ) decides the end the + // connection + LOG.error("Remote connection closed by HAWQ", e); + } catch (Exception e) { + LOG.error("Exception thrown when streaming", e); + throw new IOException(e.getMessage()); + } finally { + LOG.debug("Stopped streaming fragment " + fragment + + " of resource " + dataDir + ", " + recordCount + + " records."); + if (!threadSafe) { + unlock(dataDir); + } + } + } + }; + + return Response.ok(streaming, MediaType.APPLICATION_OCTET_STREAM).build(); + } + + /** + * Locks BRIDGE_LOCK + * + * @param path path for the request, used for logging. + */ + private void lock(String path) { + LOG.trace("Locking BridgeResource for " + path); + BRIDGE_LOCK.lock(); + LOG.trace("Locked BridgeResource for " + path); + } + + /** + * Unlocks BRIDGE_LOCK + * + * @param path path for the request, used for logging. + */ + private void unlock(String path) { + LOG.trace("Unlocking BridgeResource for " + path); + BRIDGE_LOCK.unlock(); + LOG.trace("Unlocked BridgeResource for " + path); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ClusterNodesResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ClusterNodesResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ClusterNodesResource.java new file mode 100644 index 0000000..1280c09 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ClusterNodesResource.java @@ -0,0 +1,148 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.catalina.connector.ClientAbortException; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +/** + * Class enhances the API of the HBASE rest server. + * Example for querying API getClusterNodesInfo from a web client + * <code>curl "http://localhost:51200/pxf/{version}/HadoopCluster/getNodesInfo"</code> + * /pxf/ is made part of the path when there is a webapp by that name in tcServer. + */ +@Path("/" + Version.PXF_PROTOCOL_VERSION + "/HadoopCluster/") +public class ClusterNodesResource { + private static final Log LOG = LogFactory.getLog(ClusterNodesResource.class); + + public ClusterNodesResource() { + } + + /** + * Function queries the Hadoop namenode with the getDataNodeStats API It + * gets the host's IP and REST port of every HDFS data node in the cluster. + * Then, it packs the results in JSON format and writes to the HTTP response + * stream. Response Examples:<br> + * <ol> + * <li>When there are no datanodes - getDataNodeStats returns an empty array + * <code>{"regions":[]}</code></li> + * <li>When there are datanodes + * <code>{"regions":[{"host":"1.2.3.1","port":50075},{"host":"1.2.3.2","port" + * :50075}]}</code></li> + * </ol> + * + * @return JSON response with nodes info + * @throws Exception if failed to retrieve info + */ + @GET + @Path("getNodesInfo") + @Produces("application/json") + public Response read() throws Exception { + LOG.debug("getNodesInfo started"); + StringBuilder jsonOutput = new StringBuilder("{\"regions\":["); + try { + /* + * 1. Initialize the HADOOP client side API for a distributed file + * system + */ + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + DistributedFileSystem dfs = (DistributedFileSystem) fs; + + /* + * 2. Query the namenode for the datanodes info. Only live nodes are + * returned - in accordance with the results returned by + * org.apache.hadoop.hdfs.tools.DFSAdmin#report(). + */ + DatanodeInfo[] liveNodes = dfs.getDataNodeStats(DatanodeReportType.LIVE); + + /* + * 3. Pack the datanodes info in a JSON text format and write it to + * the HTTP output stream. + */ + String prefix = ""; + for (DatanodeInfo node : liveNodes) { + verifyNode(node); + // write one node to the HTTP stream + jsonOutput.append(prefix).append(writeNode(node)); + prefix = ","; + } + jsonOutput.append("]}"); + LOG.debug("getNodesCluster output: " + jsonOutput); + } catch (NodeDataException e) { + LOG.error("Nodes verification failed", e); + throw e; + } catch (ClientAbortException e) { + LOG.error("Remote connection closed by HAWQ", e); + throw e; + } catch (java.io.IOException e) { + LOG.error("Unhandled exception thrown", e); + throw e; + } + + return Response.ok(jsonOutput.toString(), + MediaType.APPLICATION_JSON_TYPE).build(); + } + + private class NodeDataException extends java.io.IOException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public NodeDataException(String paramString) { + super(paramString); + } + } + + private void verifyNode(DatanodeInfo node) throws NodeDataException { + int port = node.getInfoPort(); + String ip = node.getIpAddr(); + + if (StringUtils.isEmpty(ip)) { + throw new NodeDataException("Invalid IP: " + ip + " (Node " + node + + ")"); + } + + if (port <= 0) { + throw new NodeDataException("Invalid port: " + port + " (Node " + + node + ")"); + } + } + + String writeNode(DatanodeInfo node) throws java.io.IOException { + return "{\"host\":\"" + node.getIpAddr() + "\",\"port\":" + + node.getInfoPort() + "}"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/FragmenterResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/FragmenterResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/FragmenterResource.java new file mode 100644 index 0000000..d6e8d49 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/FragmenterResource.java @@ -0,0 +1,154 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.Fragment; +import org.apache.hawq.pxf.api.Fragmenter; +import org.apache.hawq.pxf.api.FragmentsStats; +import org.apache.hawq.pxf.service.FragmenterFactory; +import org.apache.hawq.pxf.service.FragmentsResponse; +import org.apache.hawq.pxf.service.FragmentsResponseFormatter; +import org.apache.hawq.pxf.service.utilities.AnalyzeUtils; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.hawq.pxf.service.utilities.SecuredHDFS; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.servlet.ServletContext; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import java.util.List; +import java.util.Map; + +/** + * Class enhances the API of the WEBHDFS REST server. Returns the data fragments + * that a data resource is made of, enabling parallel processing of the data + * resource. Example for querying API FRAGMENTER from a web client + * {@code curl -i "http://localhost:51200/pxf/{version}/Fragmenter/getFragments?path=/dir1/dir2/*txt"} + * <code>/pxf/</code> is made part of the path when there is a webapp by that + * name in tomcat. + */ +@Path("/" + Version.PXF_PROTOCOL_VERSION + "/Fragmenter/") +public class FragmenterResource extends RestResource { + private static final Log LOG = LogFactory.getLog(FragmenterResource.class); + + /** + * The function is called when + * {@code http://nn:port/pxf/{version}/Fragmenter/getFragments?path=...} is used. + * + * @param servletContext Servlet context contains attributes required by + * SecuredHDFS + * @param headers Holds HTTP headers from request + * @param path Holds URI path option used in this request + * @return response object with JSON serialized fragments metadata + * @throws Exception if getting fragments info failed + */ + @GET + @Path("getFragments") + @Produces("application/json") + public Response getFragments(@Context final ServletContext servletContext, + @Context final HttpHeaders headers, + @QueryParam("path") final String path) + throws Exception { + + ProtocolData protData = getProtocolData(servletContext, headers, path); + + /* Create a fragmenter instance with API level parameters */ + final Fragmenter fragmenter = FragmenterFactory.create(protData); + + List<Fragment> fragments = fragmenter.getFragments(); + + fragments = AnalyzeUtils.getSampleFragments(fragments, protData); + + FragmentsResponse fragmentsResponse = FragmentsResponseFormatter.formatResponse( + fragments, path); + + return Response.ok(fragmentsResponse, MediaType.APPLICATION_JSON_TYPE).build(); + } + + /** + * The function is called when + * {@code http://nn:port/pxf/{version}/Fragmenter/getFragmentsStats?path=...} is + * used. + * + * @param servletContext Servlet context contains attributes required by + * SecuredHDFS + * @param headers Holds HTTP headers from request + * @param path Holds URI path option used in this request + * @return response object with JSON serialized fragments statistics + * @throws Exception if getting fragments info failed + */ + @GET + @Path("getFragmentsStats") + @Produces("application/json") + public Response getFragmentsStats(@Context final ServletContext servletContext, + @Context final HttpHeaders headers, + @QueryParam("path") final String path) + throws Exception { + + ProtocolData protData = getProtocolData(servletContext, headers, path); + + /* Create a fragmenter instance with API level parameters */ + final Fragmenter fragmenter = FragmenterFactory.create(protData); + + FragmentsStats fragmentsStats = fragmenter.getFragmentsStats(); + String response = FragmentsStats.dataToJSON(fragmentsStats); + if (LOG.isDebugEnabled()) { + LOG.debug(FragmentsStats.dataToString(fragmentsStats, path)); + } + + return Response.ok(response, MediaType.APPLICATION_JSON_TYPE).build(); + } + + private ProtocolData getProtocolData(final ServletContext servletContext, + final HttpHeaders headers, + final String path) throws Exception { + + if (LOG.isDebugEnabled()) { + StringBuilder startMsg = new StringBuilder( + "FRAGMENTER started for path \"" + path + "\""); + for (String header : headers.getRequestHeaders().keySet()) { + startMsg.append(" Header: ").append(header).append(" Value: ").append( + headers.getRequestHeader(header)); + } + LOG.debug(startMsg); + } + + /* Convert headers into a case-insensitive regular map */ + Map<String, String> params = convertToCaseInsensitiveMap(headers.getRequestHeaders()); + + /* Store protocol level properties and verify */ + ProtocolData protData = new ProtocolData(params); + if (protData.getFragmenter() == null) { + protData.protocolViolation("fragmenter"); + } + SecuredHDFS.verifyToken(protData, servletContext); + + return protData; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/InvalidPathResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/InvalidPathResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/InvalidPathResource.java new file mode 100644 index 0000000..5a9f0d1 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/InvalidPathResource.java @@ -0,0 +1,179 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.utilities.Utilities; + +import com.google.common.collect.ImmutableSet; + +import java.util.Arrays; +import java.util.List; + +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.PathSegment; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.UriInfo; + +/** + * Class for catching paths that are not defined by other resources. + * NOTE: This resource must be accessible without any security checks + * as it is used to verify proper load of the PXF webapp. + * + * For each path, the version is compared to the current version PXF_VERSION. + * The expected format of a path is "{@code http://<host>:<port>/pxf/<version>/<rest of path>}" + * + * The returned value is always a Server Error code (500). + * If the version is different than the current version, an appropriate error is returned with version details. + * Otherwise, an error about unknown path is returned. + */ +@Path("/") +public class InvalidPathResource { + @Context + UriInfo rootUri; + + private static final Log LOG = LogFactory.getLog(InvalidPathResource.class); + // Set of retired endpoints + private final ImmutableSet<String> retiredEndPoints = ImmutableSet.of( + "Analyzer"); + + public InvalidPathResource() { + } + + /** + * Catches path /pxf/ + * + * @return error message response + */ + @GET + @Path("/") + public Response noPathGet() { + return noPath(); + } + + /** + * Catches path /pxf/ + * + * @return error message response + */ + @POST + @Path("/") + public Response noPathPost() { + return noPath(); + } + + private Response noPath() { + return sendErrorMessage(getUnknownPathMsg()); + } + + /** + * Catches paths of pattern /pxf/* + * + * @param path request path + * @return error message response + */ + @GET + @Path("/{path:.*}") + public Response wrongPathGet(@PathParam("path") String path) { + return wrongPath(path); + } + + /** + * Catches paths of pattern /pxf/* + * + * @param path request path + * @return error message response + */ + @POST + @Path("/{path:.*}") + public Response wrongPathPost(@PathParam("path") String path) { + return wrongPath(path); + } + + private Response wrongPath(String path) { + + String errmsg; + + List<PathSegment> pathSegments = rootUri.getPathSegments(); + + if(pathSegments.isEmpty()) { + return sendErrorMessage(getUnknownPathMsg()); + } + + String version = pathSegments.get(0).getPath(); + String endPoint = (pathSegments.size() > 1) ? pathSegments.get(1).getPath() : null; + + LOG.debug("REST request: " + rootUri.getAbsolutePath() + ". " + + "Version " + version + ", supported version is " + Version.PXF_PROTOCOL_VERSION); + + if(version.equals(Version.PXF_PROTOCOL_VERSION)) { // api with correct version but incorrect path + if (retiredEndPoints.contains(endPoint)) { // api with retired endpoint + errmsg = getRetiredPathMsg(endPoint); + } else { + errmsg = getUnknownPathMsg(); + } + } else if(!(version.matches("v[0-9]+"))) { // api with version not of the format "v<number>" + errmsg = getUnknownPathMsg(); + } else { // api with wrong version number + errmsg = "Wrong version " + version + ", supported version is " + Version.PXF_PROTOCOL_VERSION; + } + + return sendErrorMessage(errmsg); + } + + /** + * Returns error message + */ + private Response sendErrorMessage(String message) { + ResponseBuilder b = Response.serverError(); + b.entity(message); + b.type(MediaType.TEXT_PLAIN_TYPE); + return b.build(); + } + + /** + * Returns unknown path message, with the path's special characters masked. + */ + private String getUnknownPathMsg() { + return "Unknown path \"" + Utilities.maskNonPrintables(rootUri.getAbsolutePath().toString()) + "\""; + } + + /** + * Warn on recently retired paths + * eg: http://<host>:<port>/pxf/<version>/Analyzer/<rest of path> + * + * Returns message about path not being supported + */ + private String getRetiredPathMsg(String endpoint) { + if("Analyzer".equals(endpoint)) { + return endpoint + " API is retired. Please use /Fragmenter/getFragmentsStats instead"; + } else { + return endpoint + " API is retired"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/MetadataResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/MetadataResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/MetadataResource.java new file mode 100644 index 0000000..3f85bb8 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/MetadataResource.java @@ -0,0 +1,124 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import javax.servlet.ServletContext; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.catalina.connector.ClientAbortException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hawq.pxf.api.Metadata; +import org.apache.hawq.pxf.api.MetadataFetcher; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.service.MetadataFetcherFactory; +import org.apache.hawq.pxf.service.MetadataResponse; +import org.apache.hawq.pxf.service.MetadataResponseFormatter; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.hawq.pxf.service.utilities.SecuredHDFS; + +/** + * Class enhances the API of the WEBHDFS REST server. Returns the metadata of a + * given hcatalog table. <br> + * Example for querying API FRAGMENTER from a web client:<br> + * <code>curl -i "http://localhost:51200/pxf/{version}/Metadata/getTableMetadata?table=t1"</code> + * <br> + * /pxf/ is made part of the path when there is a webapp by that name in tomcat. + */ +@Path("/" + Version.PXF_PROTOCOL_VERSION + "/Metadata/") +public class MetadataResource extends RestResource { + private static final Log LOG = LogFactory.getLog(MetadataResource.class); + + public MetadataResource() throws IOException { + } + + /** + * This function queries the underlying store based on the given profile to get schema for items that match the given pattern + * metadata: Item name, field names, field types. The types are converted + * from the underlying types to HAWQ types. + * Unsupported types result in an error. <br> + * Response Examples:<br> + * For a table <code>default.t1</code> with 2 fields (a int, b float) will + * be returned as: + * <code>{"PXFMetadata":[{"item":{"path":"default","name":"t1"},"fields":[{"name":"a","type":"int"},{"name":"b","type":"float"}]}]}</code> + * + * @param servletContext servlet context + * @param headers http headers + * @param profile based on this the metadata source can be inferred + * @param pattern table/file name or pattern in the given source + * @return JSON formatted response with metadata of each item that corresponds to the pattern + * @throws Exception if connection to the source/catalog failed, item didn't exist for the pattern + * its type or fields are not supported + */ + @GET + @Path("getMetadata") + @Produces("application/json") + public Response read(@Context final ServletContext servletContext, + @Context final HttpHeaders headers, + @QueryParam("profile") final String profile, + @QueryParam("pattern") final String pattern) + throws Exception { + LOG.debug("getMetadata started"); + String jsonOutput; + try { + + // Convert headers into a regular map + Map<String, String> params = convertToCaseInsensitiveMap(headers.getRequestHeaders()); + + // Add profile and verify token + ProtocolData protData = new ProtocolData(params, profile.toLowerCase()); + + // 0. Verify token + SecuredHDFS.verifyToken(protData, servletContext); + + // 1. start MetadataFetcher + MetadataFetcher metadataFetcher = MetadataFetcherFactory.create(protData); + + // 2. get Metadata + List<Metadata> metadata = metadataFetcher.getMetadata(pattern); + + // 3. stream JSON ouptput + MetadataResponse metadataResponse = MetadataResponseFormatter.formatResponse( + metadata, pattern); + + return Response.ok(metadataResponse, MediaType.APPLICATION_JSON_TYPE).build(); + + } catch (ClientAbortException e) { + LOG.error("Remote connection closed by HAWQ", e); + throw e; + } catch (java.io.IOException e) { + LOG.error("Unhandled exception thrown", e); + throw e; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/RestResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/RestResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/RestResource.java new file mode 100644 index 0000000..60bb31e --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/RestResource.java @@ -0,0 +1,71 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import javax.ws.rs.core.MultivaluedMap; + +import org.apache.commons.codec.CharEncoding; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * Super of all PXF REST classes + */ +public abstract class RestResource { + + private static final Log LOG = LogFactory.getLog(RestResource.class); + + /** + * Converts the request headers multivalued map to a case-insensitive + * regular map by taking only first values and storing them in a + * CASE_INSENSITIVE_ORDER TreeMap. All values are converted from ISO_8859_1 + * (ISO-LATIN-1) to UTF_8. + * + * @param requestHeaders request headers multi map. + * @return a regular case-insensitive map. + * @throws UnsupportedEncodingException if the named charsets ISO_8859_1 and + * UTF_8 are not supported + */ + public Map<String, String> convertToCaseInsensitiveMap(MultivaluedMap<String, String> requestHeaders) + throws UnsupportedEncodingException { + Map<String, String> result = new TreeMap<>( + String.CASE_INSENSITIVE_ORDER); + for (Map.Entry<String, List<String>> entry : requestHeaders.entrySet()) { + String key = entry.getKey(); + List<String> values = entry.getValue(); + if (values != null) { + String value = values.get(0); + if (value != null) { + // converting to value UTF-8 encoding + value = new String(value.getBytes(CharEncoding.ISO_8859_1), + CharEncoding.UTF_8); + LOG.trace("key: " + key + ". value: " + value); + result.put(key, value.replace("\\\"", "\"")); + } + } + } + return result; + } +}
