Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171042814 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java --- @@ -0,0 +1,502 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLEncoder; +import java.net.MalformedURLException; +import java.net.ProtocolException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.gson.JsonParser; +import com.google.gson.JsonElement; +import com.google.gson.JsonArray; + + +/** + * Ignite database read and write accessor + */ +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, WriteAccessor { + private static final Log LOG = LogFactory.getLog(IgniteAccessor.class); + + // Prepared URLs to send to Ignite when reading data + private String urlReadStart = null; + private String urlReadFetch = null; + private String urlReadClose = null; + // Set to true when Ignite reported all the data for the SELECT query was retreived + private boolean isLastReadFinished = false; + // A buffer to store the SELECT query results (without Ignite metadata) + private LinkedList<JsonArray> bufferRead = new LinkedList<JsonArray>(); + + // A template for the INSERT + private String queryWrite = null; + // Set to true when the INSERT operation is in progress + private boolean isWriteActive = false; + // A buffer to store prepared values for the INSERT query + private LinkedList<OneRow> bufferWrite = new LinkedList<OneRow>(); + + + /** + * Class constructor. + */ + public IgniteAccessor(InputData inputData) throws UserDataException { + super(inputData); + } + + /** + * openForRead() implementation + */ + @Override + public boolean openForRead() throws Exception { + if (bufferSize == 0) { + bufferSize = 1; + } + + StringBuilder sb = new StringBuilder(); + + // Insert a list of fields to be selected + ArrayList<ColumnDescriptor> columns = inputData.getTupleDescription(); + if (columns == null) { + throw new UserDataException("Tuple description must be present."); + } + sb.append("SELECT "); + for (int i = 0; i < columns.size(); i++) { + ColumnDescriptor column = columns.get(i); + if (i > 0) { + sb.append(","); + } + sb.append(column.columnName()); + } + + // Insert the name of the table to select values from + sb.append(" FROM "); + String tableName = inputData.getDataSource(); + if (tableName == null) { + throw new UserDataException("Table name must be set as DataSource."); + } + sb.append(tableName); + + // Insert query constraints + // Note: Filter constants may be provided separately from the query itself, mostly for the safety of the SQL queries; at the moment, however, they are passed in the query itself. + ArrayList<String> filterConstants = null; + if (inputData.hasFilter()) { + WhereSQLBuilder filterBuilder = new WhereSQLBuilder(inputData); + String whereSql = filterBuilder.buildWhereSQL(); + + if (whereSql != null) { + sb.append(" WHERE ").append(whereSql); + } + } + + // Insert partition constaints + sb = new IgnitePartitionFragmenter(inputData).buildFragmenterSql(sb); + + // Format URL + urlReadStart = buildQueryFldexe(sb.toString(), filterConstants); + + // Send the first REST request that opens the connection + JsonElement response = sendRestRequest(urlReadStart); + + // Build 'urlReadFetch' and 'urlReadClose' + isLastReadFinished = response.getAsJsonObject().get("last").getAsBoolean(); + urlReadFetch = buildQueryFetch(response.getAsJsonObject().get("queryId").getAsInt()); + urlReadClose = buildQueryCls(response.getAsJsonObject().get("queryId").getAsInt()); + + LOG.info("Ignite read request. URL: '" + urlReadStart + "'"); --- End diff -- this should probably be debug level, otherwise each query will leave a log trace ?
---