Github user sansanichfb commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171680865 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgnitePartitionFragmenter.java --- @@ -0,0 +1,411 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.Fragment; +import org.apache.hawq.pxf.api.Fragmenter; +import org.apache.hawq.pxf.api.FragmentsStats; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.InputData; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.lang.ArrayUtils; + + +/** + * PXF-Ignite fragmenter class + * + * This fragmenter works just like the one in PXF JDBC plugin + */ +public class IgnitePartitionFragmenter extends Fragmenter { + /** + * Class constructor + * + * @throws UserDataException if the request parameter is malformed + */ + public IgnitePartitionFragmenter(InputData inputData) throws UserDataException { + super(inputData); + if (LOG.isDebugEnabled()) { + LOG.debug("Constructor started"); + } + + if (inputData.getUserProperty("PARTITION_BY") == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Constructor successful; partition was not used"); + } + return; + } + + try { + partitionBy = inputData.getUserProperty("PARTITION_BY").split(":"); + partitionType = PartitionType.typeOf(partitionBy[1]); + } + catch (IllegalArgumentException | ArrayIndexOutOfBoundsException e1) { + throw new UserDataException("The parameter 'PARTITION_BY' is invalid. The pattern is 'column_name:DATE|INT|ENUM'"); + } + + //parse and validate parameter-RANGE + try { + String rangeStr = inputData.getUserProperty("RANGE"); + if (rangeStr != null) { + range = rangeStr.split(":"); + if (range.length == 1 && partitionType != PartitionType.ENUM) { + throw new UserDataException("The parameter 'RANGE' does not specify '[:end_value]'"); + } + } + else { + throw new UserDataException("The parameter 'RANGE' must be specified along with 'PARTITION_BY'"); + } + } + catch (IllegalArgumentException e) { + throw new UserDataException("The parameter 'RANGE' is invalid, the pattern is 'start_value[:end_value]'"); + } + + //parse and validate parameter-INTERVAL + try { + String intervalStr = inputData.getUserProperty("INTERVAL"); + if (intervalStr != null) { + interval = intervalStr.split(":"); + intervalNum = Integer.parseInt(interval[0]); + if (interval.length > 1) { + intervalType = IntervalType.typeOf(interval[1]); + } + if (interval.length == 1 && partitionType == PartitionType.DATE) { + throw new UserDataException("The parameter 'INTERVAL' does not specify unit [:year|month|day]"); + } + } + else if (partitionType != PartitionType.ENUM) { + throw new UserDataException("The parameter 'INTERVAL' must be specified along with 'PARTITION_BY'"); + } + if (intervalNum < 1) { + throw new UserDataException("The parameter 'INTERVAL' must be at least 1. The actual is '" + intervalNum + "'"); + } + } + catch (IllegalArgumentException e) { + throw new UserDataException("The parameter 'INTERVAL' invalid. The pattern is 'interval_num[:interval_unit]'"); + } + + //parse date values + try { + if (partitionType == PartitionType.DATE) { + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd"); + rangeStart = Calendar.getInstance(); + rangeStart.setTime(df.parse(range[0])); + rangeEnd = Calendar.getInstance(); + rangeEnd.setTime(df.parse(range[1])); + } + } catch (ParseException e) { + throw new UserDataException("The parameter 'RANGE' has invalid date format. Expected format is 'yyyy-MM-dd'"); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Constructor successful; some partition used"); + } + } + + /** + * Returns statistics for the Ignite table. This is not implemented in the current version + * @throws UnsupportedOperationException + */ + @Override + public FragmentsStats getFragmentsStats() throws UnsupportedOperationException { + throw new UnsupportedOperationException("ANALYZE for Ignite plugin is not supported"); + } + + /** + * Returns list of fragments for Ignite table queries + * + * @throws UnsupportedOperationException if a partition of unknown type was found + * + * @return a list of fragments + */ + @Override + public List<Fragment> getFragments() throws UnsupportedOperationException { + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments() called; dataSource is '" + inputData.getDataSource() + "'"); + } + + byte[] fragmentMetadata = null; + byte[] fragmentUserdata = null; + + if (partitionType == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments() found no partition"); + } + Fragment fragment = new Fragment(inputData.getDataSource(), replicaHostAddressWrapped, fragmentMetadata, fragmentUserdata); + fragments.add(fragment); + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments() successful"); + } + return fragments; + } + + switch (partitionType) { + case DATE: { + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments() found DATE partition"); + } + int currInterval = intervalNum; + + Calendar fragStart = rangeStart; + while (fragStart.before(rangeEnd)) { + Calendar fragEnd = (Calendar) fragStart.clone(); + switch (intervalType) { + case DAY: + fragEnd.add(Calendar.DAY_OF_MONTH, currInterval); + break; + case MONTH: + fragEnd.add(Calendar.MONTH, currInterval); + break; + case YEAR: + fragEnd.add(Calendar.YEAR, currInterval); + break; + } + if (fragEnd.after(rangeEnd)) + fragEnd = (Calendar) rangeEnd.clone(); + + // Note that the date is stored in milliseconds + byte[] msStart = ByteUtil.getBytes(fragStart.getTimeInMillis()); + byte[] msEnd = ByteUtil.getBytes(fragEnd.getTimeInMillis()); + fragmentMetadata = ArrayUtils.addAll(msStart, msEnd); + + Fragment fragment = new Fragment(inputData.getDataSource(), replicaHostAddressWrapped, fragmentMetadata, fragmentUserdata); + fragments.add(fragment); + + // Continue the previous fragment + fragStart = fragEnd; + } + break; + } + case INT: { + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments() found INT partition"); + } + int rangeStart = Integer.parseInt(range[0]); + int rangeEnd = Integer.parseInt(range[1]); + int currInterval = intervalNum; + + int fragStart = rangeStart; + while (fragStart < rangeEnd) { + int fragEnd = fragStart + currInterval; + if (fragEnd > rangeEnd) { + fragEnd = rangeEnd; + } + + byte[] bStart = ByteUtil.getBytes(fragStart); + byte[] bEnd = ByteUtil.getBytes(fragEnd); + fragmentMetadata = ArrayUtils.addAll(bStart, bEnd); + + Fragment fragment = new Fragment(inputData.getDataSource(), replicaHostAddressWrapped, fragmentMetadata, fragmentUserdata); + fragments.add(fragment); + + // Continue the previous fragment + fragStart = fragEnd; + } + break; + } + case ENUM: { + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments() found ENUM partition"); + } + for (String frag : range) { + fragmentMetadata = frag.getBytes(); + Fragment fragment = new Fragment(inputData.getDataSource(), replicaHostAddressWrapped, fragmentMetadata, fragmentUserdata); + fragments.add(fragment); + } + break; + } + default: { + throw new UnsupportedOperationException("getFragments() found a partition of unknown type and failed"); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments() successful"); + } + return fragments; + } + + /** + * Insert partition constraints into the prepared SQL query. + * + * @param inputData pre-validated PXF InputData + * @param sb the SQL query that is prepared for appending WHERE constraints. + * Other SQL statements may be present, but they must be complete. Note that no check is performed to check their "completeness" + */ + public static StringBuilder buildFragmenterSql(InputData inputData, StringBuilder sb) { --- End diff -- Since you are mutating initial query - I think it would be cleaner to do it in place and return `void`.
---