Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1344#discussion_r171042814
  
    --- Diff: 
pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java
 ---
    @@ -0,0 +1,502 @@
    +package org.apache.hawq.pxf.plugins.ignite;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +import java.io.InputStreamReader;
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.UnsupportedEncodingException;
    +import java.net.URL;
    +import java.net.URLEncoder;
    +import java.net.MalformedURLException;
    +import java.net.ProtocolException;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +import com.google.gson.JsonParser;
    +import com.google.gson.JsonElement;
    +import com.google.gson.JsonArray;
    +
    +
    +/**
    + * Ignite database read and write accessor
    + */
    +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, 
WriteAccessor {
    +    private static final Log LOG = LogFactory.getLog(IgniteAccessor.class);
    +
    +    // Prepared URLs to send to Ignite when reading data
    +    private String urlReadStart = null;
    +    private String urlReadFetch = null;
    +    private String urlReadClose = null;
    +    // Set to true when Ignite reported all the data for the SELECT query 
was retreived
    +    private boolean isLastReadFinished = false;
    +    // A buffer to store the SELECT query results (without Ignite metadata)
    +    private LinkedList<JsonArray> bufferRead = new LinkedList<JsonArray>();
    +
    +    // A template for the INSERT 
    +    private String queryWrite = null;
    +    // Set to true when the INSERT operation is in progress
    +    private boolean isWriteActive = false;
    +    // A buffer to store prepared values for the INSERT query
    +    private LinkedList<OneRow> bufferWrite = new LinkedList<OneRow>();
    +    
    +
    +    /**
    +     * Class constructor.
    +     */
    +    public IgniteAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     */
    +    @Override
    +    public boolean openForRead() throws Exception {
    +        if (bufferSize == 0) {
    +            bufferSize = 1;
    +        }
    +
    +        StringBuilder sb = new StringBuilder();
    +
    +        // Insert a list of fields to be selected
    +        ArrayList<ColumnDescriptor> columns = 
inputData.getTupleDescription();
    +        if (columns == null) {
    +            throw new UserDataException("Tuple description must be 
present.");
    +        }
    +        sb.append("SELECT ");
    +        for (int i = 0; i < columns.size(); i++) {
    +            ColumnDescriptor column = columns.get(i);
    +            if (i > 0) {
    +                sb.append(",");
    +            }
    +            sb.append(column.columnName());
    +        }
    +
    +        // Insert the name of the table to select values from
    +        sb.append(" FROM ");
    +        String tableName = inputData.getDataSource();
    +        if (tableName == null) {
    +            throw new UserDataException("Table name must be set as 
DataSource.");
    +        }
    +        sb.append(tableName);
    +
    +        // Insert query constraints
    +        // Note: Filter constants may be provided separately from the 
query itself, mostly for the safety of the SQL queries; at the moment, however, 
they are passed in the query itself.
    +        ArrayList<String> filterConstants = null;
    +        if (inputData.hasFilter()) {
    +            WhereSQLBuilder filterBuilder = new WhereSQLBuilder(inputData);
    +            String whereSql = filterBuilder.buildWhereSQL();
    +
    +            if (whereSql != null) {
    +                sb.append(" WHERE ").append(whereSql);
    +            }
    +        }
    +
    +        // Insert partition constaints
    +        sb = new 
IgnitePartitionFragmenter(inputData).buildFragmenterSql(sb);
    +
    +        // Format URL
    +        urlReadStart = buildQueryFldexe(sb.toString(), filterConstants);
    +
    +        // Send the first REST request that opens the connection
    +        JsonElement response = sendRestRequest(urlReadStart);
    +
    +        // Build 'urlReadFetch' and 'urlReadClose'
    +        isLastReadFinished = 
response.getAsJsonObject().get("last").getAsBoolean();
    +        urlReadFetch = 
buildQueryFetch(response.getAsJsonObject().get("queryId").getAsInt());
    +        urlReadClose = 
buildQueryCls(response.getAsJsonObject().get("queryId").getAsInt());
    +
    +        LOG.info("Ignite read request. URL: '" + urlReadStart + "'");
    --- End diff --
    
    this should probably be debug level, otherwise each query will leave a log 
trace ?


---

Reply via email to