Github user denalex commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1344#discussion_r171053913
--- Diff:
pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java
---
@@ -0,0 +1,502 @@
+package org.apache.hawq.pxf.plugins.ignite;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.WriteAccessor;
+import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.io.InputStreamReader;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.net.MalformedURLException;
+import java.net.ProtocolException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.gson.JsonParser;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonArray;
+
+
+/**
+ * Ignite database read and write accessor
+ */
+public class IgniteAccessor extends IgnitePlugin implements ReadAccessor,
WriteAccessor {
+ private static final Log LOG = LogFactory.getLog(IgniteAccessor.class);
+
+ // Prepared URLs to send to Ignite when reading data
+ private String urlReadStart = null;
+ private String urlReadFetch = null;
+ private String urlReadClose = null;
+ // Set to true when Ignite reported all the data for the SELECT query
was retreived
+ private boolean isLastReadFinished = false;
+ // A buffer to store the SELECT query results (without Ignite metadata)
+ private LinkedList<JsonArray> bufferRead = new LinkedList<JsonArray>();
+
+ // A template for the INSERT
+ private String queryWrite = null;
+ // Set to true when the INSERT operation is in progress
+ private boolean isWriteActive = false;
+ // A buffer to store prepared values for the INSERT query
+ private LinkedList<OneRow> bufferWrite = new LinkedList<OneRow>();
+
+
+ /**
+ * Class constructor.
+ */
+ public IgniteAccessor(InputData inputData) throws UserDataException {
+ super(inputData);
+ }
+
+ /**
+ * openForRead() implementation
+ */
+ @Override
+ public boolean openForRead() throws Exception {
+ if (bufferSize == 0) {
+ bufferSize = 1;
+ }
+
+ StringBuilder sb = new StringBuilder();
+
+ // Insert a list of fields to be selected
+ ArrayList<ColumnDescriptor> columns =
inputData.getTupleDescription();
+ if (columns == null) {
+ throw new UserDataException("Tuple description must be
present.");
+ }
+ sb.append("SELECT ");
+ for (int i = 0; i < columns.size(); i++) {
+ ColumnDescriptor column = columns.get(i);
+ if (i > 0) {
+ sb.append(",");
+ }
+ sb.append(column.columnName());
+ }
+
+ // Insert the name of the table to select values from
+ sb.append(" FROM ");
+ String tableName = inputData.getDataSource();
+ if (tableName == null) {
+ throw new UserDataException("Table name must be set as
DataSource.");
+ }
+ sb.append(tableName);
+
+ // Insert query constraints
+ // Note: Filter constants may be provided separately from the
query itself, mostly for the safety of the SQL queries; at the moment, however,
they are passed in the query itself.
+ ArrayList<String> filterConstants = null;
+ if (inputData.hasFilter()) {
+ WhereSQLBuilder filterBuilder = new WhereSQLBuilder(inputData);
+ String whereSql = filterBuilder.buildWhereSQL();
+
+ if (whereSql != null) {
+ sb.append(" WHERE ").append(whereSql);
+ }
+ }
+
+ // Insert partition constaints
+ sb = new
IgnitePartitionFragmenter(inputData).buildFragmenterSql(sb);
+
+ // Format URL
+ urlReadStart = buildQueryFldexe(sb.toString(), filterConstants);
+
+ // Send the first REST request that opens the connection
+ JsonElement response = sendRestRequest(urlReadStart);
+
+ // Build 'urlReadFetch' and 'urlReadClose'
+ isLastReadFinished =
response.getAsJsonObject().get("last").getAsBoolean();
+ urlReadFetch =
buildQueryFetch(response.getAsJsonObject().get("queryId").getAsInt());
+ urlReadClose =
buildQueryCls(response.getAsJsonObject().get("queryId").getAsInt());
+
+ LOG.info("Ignite read request. URL: '" + urlReadStart + "'");
+ return true;
+ }
+
+ /**
+ * readNextObject() implementation
+ */
+ @Override
+ public OneRow readNextObject() throws Exception {
+ if (urlReadFetch == null) {
+ LOG.error("readNextObject(): urlReadFetch is null. This means
the Ignite qryfldexe query was not executed properly");
+ throw new ProtocolException("readNextObject(): urlReadFetch is
null. This means the Ignite qryfldexe query was not executed properly");
+ }
+
+ if (bufferRead.isEmpty()) {
+ // Refill buffer
+ if (isLastReadFinished) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("readNextObject(): All the data received
from Ignite");
+ }
+ return null;
+ }
+
+ JsonElement response = sendRestRequest(urlReadFetch);
+ isLastReadFinished =
response.getAsJsonObject().get("last").getAsBoolean();
+
+ // Parse 'items'
+ Iterator<JsonElement> itemsIterator =
response.getAsJsonObject().get("items").getAsJsonArray().iterator();
+ while (itemsIterator.hasNext()) {
+ if
(!bufferRead.add(itemsIterator.next().getAsJsonArray())) {
+ LOG.error("readNextObject(): Buffer refill failed (not
enough memory in 'bufferRead')");
+ throw new OutOfMemoryError("readNextObject(): not
enough memory in 'bufferRead'");
+ }
+ }
+
+ // Check again in case "response" contains no elements
+ if (bufferRead.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("readNextObject(): Buffer refill failed");
+ LOG.debug("readNextObject(): All the data received
from Ignite");
+ }
+ return null;
+ }
+ }
+
+ return new OneRow(bufferRead.pollFirst());
+ }
+
+ /**
+ * closeForRead() implementation
+ */
+ @Override
+ public void closeForRead() {
+ if (urlReadClose != null) {
+ try {
+ sendRestRequest(urlReadClose);
+ }
+ catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closeForRead() Exception: " +
e.getClass().getSimpleName());
+ }
+ }
+ }
+ isLastReadFinished = false;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignite read request finished. URL: '" +
urlReadClose + "'");
+ }
+ }
+
+ /**
+ * openForWrite() implementation.
+ * No queries are sent to Ignite by this procedure, so if there are
some problems (for example, with connection), they will be revealed only during
the execution of 'writeNextObject()'
+ */
+ @Override
+ public boolean openForWrite() throws UserDataException {
+ // This is a temporary solution. At the moment there is no other
way (except for the usage of user-defined parameters) to get the correct name
of Ignite table: GPDB inserts extra data into the address, as required by
Hadoop.
+ // Note that if no extra data is present, the 'definedSource' will
be left unchanged
+ String definedSource = inputData.getDataSource();
+ Pattern pattern = Pattern.compile("/(.*)/[0-9]*-[0-9]*_[0-9]*");
+ Matcher matcher = pattern.matcher(definedSource);
+ if (matcher.find()) {
+ inputData.setDataSource(matcher.group(1));
+ }
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("INSERT INTO ");
+
+ // Insert the table name
+ String tableName = inputData.getDataSource();
+ if (tableName == null) {
+ throw new UserDataException("Table name must be set as
DataSource.");
+ }
+ sb.append(tableName);
+
+ // Insert the column names
+ sb.append("(");
+ ArrayList<ColumnDescriptor> columns =
inputData.getTupleDescription();
+ if (columns == null) {
+ throw new UserDataException("Tuple description must be
present.");
+ }
+ String fieldDivisor = "";
+ for (int i = 0; i < columns.size(); i++) {
+ sb.append(fieldDivisor);
+ fieldDivisor = ", ";
+ sb.append(columns.get(i).columnName());
+ }
+ sb.append(")");
+
+ sb.append(" VALUES ");
+
+ queryWrite = sb.toString();
+ return true;
+ }
+
+ /**
+ * writeNextObject() implementation
+ */
+ @Override
+ public boolean writeNextObject(OneRow currentRow) throws Exception {
+ boolean currentRowInBuffer = bufferWrite.add(currentRow);
+
+ if (!isWriteActive) {
+ if (!currentRowInBuffer) {
+ LOG.error("writeNextObject(): Failed (not enough memory in
'bufferWrite')");
+ throw new OutOfMemoryError("writeNextObject(): not enough
memory in 'bufferWrite'");
+ }
+ LOG.info("Ignite write request. Query: '" + queryWrite + "'");
+ sendInsertRestRequest(queryWrite);
+ bufferWrite.removeFirst();
+ isWriteActive = true;
+ return true;
+ }
+
+ if ((bufferWrite.size() >= bufferSize) || (!currentRowInBuffer)) {
+ sendInsertRestRequest(queryWrite);
+ }
+
+ if (!currentRowInBuffer) {
+ if (!bufferWrite.add(currentRow)) {
+ LOG.error("writeNextObject(): Failed (not enough memory in
'bufferSend')");
+ throw new OutOfMemoryError("writeNextObject(): not enough
memory in 'bufferSend'");
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * closeForWrite() implementation
+ */
+ @Override
+ public void closeForWrite() throws Exception {
+ if (!bufferWrite.isEmpty()) {
+ sendInsertRestRequest(queryWrite);
+ }
+ if (isWriteActive) {
+ // At this point, the request must have finished successfully;
otherwise an exception would be thrown before
+ LOG.info("Ignite write request finished successfully. Query:
'" + queryWrite + "'");
+ }
+ isWriteActive = false;
+ }
+
+ /**
+ * Build HTTP GET query for Ignite REST API with command 'qryfldexe'
+ *
+ * @param querySql SQL query with filter constants. The constants must
replaced by "?" if 'filterConstants' is not given
+ * @param filterConstants
+ *
+ * @return Prepared HTTP query. The query will be properly encoded
with {@link java.net.URLEncoder}
+ *
+ * @throws UnsupportedEncodingException from {@link
java.net.URLEncoder.encode()}
+ */
+ private String buildQueryFldexe(String querySql, ArrayList<String>
filterConstants) throws UnsupportedEncodingException {
--- End diff --
`filterConstants` parameter doesn't have to be `ArrayList`, just`List`
should be enough)
---