Github user denalex commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1344#discussion_r171043644
--- Diff:
pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java
---
@@ -0,0 +1,502 @@
+package org.apache.hawq.pxf.plugins.ignite;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.WriteAccessor;
+import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.io.InputStreamReader;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.net.MalformedURLException;
+import java.net.ProtocolException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.gson.JsonParser;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonArray;
+
+
+/**
+ * Ignite database read and write accessor
+ */
+public class IgniteAccessor extends IgnitePlugin implements ReadAccessor,
WriteAccessor {
+ private static final Log LOG = LogFactory.getLog(IgniteAccessor.class);
+
+ // Prepared URLs to send to Ignite when reading data
+ private String urlReadStart = null;
+ private String urlReadFetch = null;
+ private String urlReadClose = null;
+ // Set to true when Ignite reported all the data for the SELECT query
was retreived
+ private boolean isLastReadFinished = false;
+ // A buffer to store the SELECT query results (without Ignite metadata)
+ private LinkedList<JsonArray> bufferRead = new LinkedList<JsonArray>();
+
+ // A template for the INSERT
+ private String queryWrite = null;
+ // Set to true when the INSERT operation is in progress
+ private boolean isWriteActive = false;
+ // A buffer to store prepared values for the INSERT query
+ private LinkedList<OneRow> bufferWrite = new LinkedList<OneRow>();
+
+
+ /**
+ * Class constructor.
+ */
+ public IgniteAccessor(InputData inputData) throws UserDataException {
+ super(inputData);
+ }
+
+ /**
+ * openForRead() implementation
+ */
+ @Override
+ public boolean openForRead() throws Exception {
+ if (bufferSize == 0) {
+ bufferSize = 1;
+ }
+
+ StringBuilder sb = new StringBuilder();
+
+ // Insert a list of fields to be selected
+ ArrayList<ColumnDescriptor> columns =
inputData.getTupleDescription();
+ if (columns == null) {
+ throw new UserDataException("Tuple description must be
present.");
+ }
+ sb.append("SELECT ");
+ for (int i = 0; i < columns.size(); i++) {
+ ColumnDescriptor column = columns.get(i);
+ if (i > 0) {
+ sb.append(",");
+ }
+ sb.append(column.columnName());
+ }
+
+ // Insert the name of the table to select values from
+ sb.append(" FROM ");
+ String tableName = inputData.getDataSource();
+ if (tableName == null) {
+ throw new UserDataException("Table name must be set as
DataSource.");
+ }
+ sb.append(tableName);
+
+ // Insert query constraints
+ // Note: Filter constants may be provided separately from the
query itself, mostly for the safety of the SQL queries; at the moment, however,
they are passed in the query itself.
+ ArrayList<String> filterConstants = null;
+ if (inputData.hasFilter()) {
+ WhereSQLBuilder filterBuilder = new WhereSQLBuilder(inputData);
+ String whereSql = filterBuilder.buildWhereSQL();
+
+ if (whereSql != null) {
+ sb.append(" WHERE ").append(whereSql);
+ }
+ }
+
+ // Insert partition constaints
+ sb = new
IgnitePartitionFragmenter(inputData).buildFragmenterSql(sb);
+
+ // Format URL
+ urlReadStart = buildQueryFldexe(sb.toString(), filterConstants);
+
+ // Send the first REST request that opens the connection
+ JsonElement response = sendRestRequest(urlReadStart);
+
+ // Build 'urlReadFetch' and 'urlReadClose'
+ isLastReadFinished =
response.getAsJsonObject().get("last").getAsBoolean();
+ urlReadFetch =
buildQueryFetch(response.getAsJsonObject().get("queryId").getAsInt());
+ urlReadClose =
buildQueryCls(response.getAsJsonObject().get("queryId").getAsInt());
+
+ LOG.info("Ignite read request. URL: '" + urlReadStart + "'");
+ return true;
+ }
+
+ /**
+ * readNextObject() implementation
+ */
+ @Override
+ public OneRow readNextObject() throws Exception {
+ if (urlReadFetch == null) {
+ LOG.error("readNextObject(): urlReadFetch is null. This means
the Ignite qryfldexe query was not executed properly");
+ throw new ProtocolException("readNextObject(): urlReadFetch is
null. This means the Ignite qryfldexe query was not executed properly");
+ }
+
+ if (bufferRead.isEmpty()) {
+ // Refill buffer
+ if (isLastReadFinished) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("readNextObject(): All the data received
from Ignite");
+ }
+ return null;
+ }
+
+ JsonElement response = sendRestRequest(urlReadFetch);
+ isLastReadFinished =
response.getAsJsonObject().get("last").getAsBoolean();
+
+ // Parse 'items'
+ Iterator<JsonElement> itemsIterator =
response.getAsJsonObject().get("items").getAsJsonArray().iterator();
+ while (itemsIterator.hasNext()) {
+ if
(!bufferRead.add(itemsIterator.next().getAsJsonArray())) {
+ LOG.error("readNextObject(): Buffer refill failed (not
enough memory in 'bufferRead')");
+ throw new OutOfMemoryError("readNextObject(): not
enough memory in 'bufferRead'");
--- End diff --
I think you should use some application-level exception, OOME is usually
reserved for JVM to complain when it can't get more memory.
---