Github user sansanichfb commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/1344#discussion_r171440009
--- Diff:
pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgnitePartitionFragmenter.java
---
@@ -0,0 +1,411 @@
+package org.apache.hawq.pxf.plugins.ignite;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.FragmentsStats;
+import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.utilities.InputData;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.lang.ArrayUtils;
+
+
+/**
+ * PXF-Ignite fragmenter class
+ *
+ * This fragmenter works just like the one in PXF JDBC plugin
+ */
+public class IgnitePartitionFragmenter extends Fragmenter {
+ /**
+ * Class constructor
+ *
+ * @throws UserDataException if the request parameter is malformed
+ */
+ public IgnitePartitionFragmenter(InputData inputData) throws
UserDataException {
+ super(inputData);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Constructor started");
+ }
+
+ if (inputData.getUserProperty("PARTITION_BY") == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Constructor successful; partition was not
used");
+ }
+ return;
+ }
+
+ try {
+ partitionBy =
inputData.getUserProperty("PARTITION_BY").split(":");
+ partitionType = PartitionType.typeOf(partitionBy[1]);
+ }
+ catch (IllegalArgumentException | ArrayIndexOutOfBoundsException
e1) {
+ throw new UserDataException("The parameter 'PARTITION_BY' is
invalid. The pattern is 'column_name:DATE|INT|ENUM'");
+ }
+
+ //parse and validate parameter-RANGE
+ try {
+ String rangeStr = inputData.getUserProperty("RANGE");
+ if (rangeStr != null) {
+ range = rangeStr.split(":");
+ if (range.length == 1 && partitionType !=
PartitionType.ENUM) {
+ throw new UserDataException("The parameter 'RANGE'
does not specify '[:end_value]'");
+ }
+ }
+ else {
+ throw new UserDataException("The parameter 'RANGE' must be
specified along with 'PARTITION_BY'");
+ }
+ }
+ catch (IllegalArgumentException e) {
+ throw new UserDataException("The parameter 'RANGE' is invalid,
the pattern is 'start_value[:end_value]'");
+ }
+
+ //parse and validate parameter-INTERVAL
+ try {
+ String intervalStr = inputData.getUserProperty("INTERVAL");
+ if (intervalStr != null) {
+ interval = intervalStr.split(":");
+ intervalNum = Integer.parseInt(interval[0]);
+ if (interval.length > 1) {
+ intervalType = IntervalType.typeOf(interval[1]);
+ }
+ if (interval.length == 1 && partitionType ==
PartitionType.DATE) {
+ throw new UserDataException("The parameter 'INTERVAL'
does not specify unit [:year|month|day]");
+ }
+ }
+ else if (partitionType != PartitionType.ENUM) {
+ throw new UserDataException("The parameter 'INTERVAL' must
be specified along with 'PARTITION_BY'");
+ }
+ if (intervalNum < 1) {
+ throw new UserDataException("The parameter 'INTERVAL' must
be at least 1. The actual is '" + intervalNum + "'");
+ }
+ }
+ catch (IllegalArgumentException e) {
+ throw new UserDataException("The parameter 'INTERVAL' invalid.
The pattern is 'interval_num[:interval_unit]'");
+ }
+
+ //parse date values
+ try {
+ if (partitionType == PartitionType.DATE) {
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
+ rangeStart = Calendar.getInstance();
+ rangeStart.setTime(df.parse(range[0]));
+ rangeEnd = Calendar.getInstance();
+ rangeEnd.setTime(df.parse(range[1]));
+ }
+ } catch (ParseException e) {
+ throw new UserDataException("The parameter 'RANGE' has invalid
date format. Expected format is 'yyyy-MM-dd'");
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Constructor successful; some partition used");
+ }
+ }
+
+ /**
+ * Returns statistics for the Ignite table. This is not implemented in
the current version
+ * @throws UnsupportedOperationException
+ */
+ @Override
+ public FragmentsStats getFragmentsStats() throws
UnsupportedOperationException {
+ throw new UnsupportedOperationException("ANALYZE for Ignite plugin
is not supported");
+ }
+
+ /**
+ * Returns list of fragments for Ignite table queries
+ *
+ * @throws UnsupportedOperationException if a partition of unknown
type was found
+ *
+ * @return a list of fragments
+ */
+ @Override
+ public List<Fragment> getFragments() throws
UnsupportedOperationException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getFragments() called; dataSource is '" +
inputData.getDataSource() + "'");
+ }
+
+ byte[] fragmentMetadata = null;
+ byte[] fragmentUserdata = null;
+
+ if (partitionType == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getFragments() found no partition");
+ }
+ Fragment fragment = new Fragment(inputData.getDataSource(),
replicaHostAddressWrapped, fragmentMetadata, fragmentUserdata);
+ fragments.add(fragment);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getFragments() successful");
+ }
+ return fragments;
+ }
+
+ switch (partitionType) {
+ case DATE: {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getFragments() found DATE partition");
+ }
+ int currInterval = intervalNum;
+
+ Calendar fragStart = rangeStart;
+ while (fragStart.before(rangeEnd)) {
+ Calendar fragEnd = (Calendar) fragStart.clone();
+ switch (intervalType) {
+ case DAY:
+ fragEnd.add(Calendar.DAY_OF_MONTH,
currInterval);
+ break;
+ case MONTH:
+ fragEnd.add(Calendar.MONTH, currInterval);
+ break;
+ case YEAR:
+ fragEnd.add(Calendar.YEAR, currInterval);
+ break;
+ }
+ if (fragEnd.after(rangeEnd))
+ fragEnd = (Calendar) rangeEnd.clone();
+
+ // Note that the date is stored in milliseconds
+ byte[] msStart =
ByteUtil.getBytes(fragStart.getTimeInMillis());
+ byte[] msEnd =
ByteUtil.getBytes(fragEnd.getTimeInMillis());
+ fragmentMetadata = ArrayUtils.addAll(msStart, msEnd);
+
+ Fragment fragment = new
Fragment(inputData.getDataSource(), replicaHostAddressWrapped,
fragmentMetadata, fragmentUserdata);
+ fragments.add(fragment);
+
+ // Continue the previous fragment
+ fragStart = fragEnd;
+ }
+ break;
+ }
+ case INT: {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getFragments() found INT partition");
+ }
+ int rangeStart = Integer.parseInt(range[0]);
+ int rangeEnd = Integer.parseInt(range[1]);
+ int currInterval = intervalNum;
+
+ int fragStart = rangeStart;
+ while (fragStart < rangeEnd) {
+ int fragEnd = fragStart + currInterval;
+ if (fragEnd > rangeEnd) {
+ fragEnd = rangeEnd;
+ }
+
+ byte[] bStart = ByteUtil.getBytes(fragStart);
+ byte[] bEnd = ByteUtil.getBytes(fragEnd);
+ fragmentMetadata = ArrayUtils.addAll(bStart, bEnd);
+
+ Fragment fragment = new
Fragment(inputData.getDataSource(), replicaHostAddressWrapped,
fragmentMetadata, fragmentUserdata);
+ fragments.add(fragment);
+
+ // Continue the previous fragment
+ fragStart = fragEnd;
+ }
+ break;
+ }
+ case ENUM: {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getFragments() found ENUM partition");
+ }
+ for (String frag : range) {
+ fragmentMetadata = frag.getBytes();
+ Fragment fragment = new
Fragment(inputData.getDataSource(), replicaHostAddressWrapped,
fragmentMetadata, fragmentUserdata);
+ fragments.add(fragment);
+ }
+ break;
+ }
+ default: {
+ throw new UnsupportedOperationException("getFragments()
found a partition of unknown type and failed");
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getFragments() successful");
+ }
+ return fragments;
+ }
+
+ /**
+ * Insert partition constraints into the prepared SQL query.
+ *
+ * @param inputData pre-validated PXF InputData
+ * @param sb the SQL query that is prepared for appending WHERE
constraints.
+ * Other SQL statements may be present, but they must be complete.
Note that no check is performed to check their "completeness"
+ */
+ public static StringBuilder buildFragmenterSql(InputData inputData,
StringBuilder sb) {
--- End diff --
Do we really want to mutate input parameter and then return it?
---