IGNITE-4277: Hadoop: implemented "partially raw" comparator. This closes #1345.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1ddf21f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1ddf21f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1ddf21f Branch: refs/heads/master Commit: c1ddf21fd627c76a8b7e0d81ad43480b1f1e204d Parents: 30b869d Author: devozerov <[email protected]> Authored: Thu Dec 15 11:58:28 2016 +0300 Committer: devozerov <[email protected]> Committed: Thu Dec 15 13:46:41 2016 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopClassLoader.java | 1 + .../processors/hadoop/HadoopJobProperty.java | 6 +- .../processors/hadoop/HadoopTaskContext.java | 8 ++ .../io/PartiallyOffheapRawComparatorEx.java | 33 +++++ .../hadoop/io/PartiallyRawComparator.java | 33 +++++ .../org/apache/ignite/hadoop/io/RawMemory.java | 86 ++++++++++++ .../hadoop/io/TextPartiallyRawComparator.java | 115 ++++++++++++++++ .../apache/ignite/hadoop/io/package-info.java | 22 ++++ ...DelegatingPartiallyOffheapRawComparator.java | 54 ++++++++ .../hadoop/impl/v2/HadoopV2TaskContext.java | 21 +++ .../processors/hadoop/io/OffheapRawMemory.java | 131 +++++++++++++++++++ .../shuffle/collections/HadoopSkipList.java | 14 +- .../hadoop/impl/HadoopTeraSortTest.java | 7 + .../collections/HadoopAbstractMapTest.java | 6 + 14 files changed, 535 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java index f6c2fa9..81c1405 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java @@ -372,6 +372,7 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache { // We use "contains" instead of "equals" to handle subclasses properly. if (clsName.contains("org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem") || clsName.contains("org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem") || + clsName.contains("org.apache.ignite.hadoop.io.TextPartialRawComparator") || clsName.contains("org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider")) return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java index 9e1dede..4122eef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java @@ -64,6 +64,11 @@ public enum HadoopJobProperty { JOB_SHARED_CLASSLOADER("ignite.job.shared.classloader"), /** + * Fully qualified name of partially-raw comparator which should be used on sorting phase. + */ + JOB_PARTIAL_RAW_COMPARATOR("ignite.job.partial.raw.comparator"), + + /** * Size in bytes of single memory page which will be allocated for data structures in shuffle. * <p> * By default is {@code 32 * 1024}. @@ -112,7 +117,6 @@ public enum HadoopJobProperty { */ SHUFFLE_JOB_THROTTLE("ignite.shuffle.job.throttle"); - /** Property name. */ private final String propName; http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java index ecb9f26..dddd017 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java @@ -22,6 +22,7 @@ import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx; /** * Task context. @@ -157,6 +158,13 @@ public abstract class HadoopTaskContext { public abstract Comparator<Object> sortComparator(); /** + * Get semi-raw sorting comparator. + * + * @return Semi-raw sorting comparator. + */ + public abstract PartiallyOffheapRawComparatorEx<Object> partialRawSortComparator(); + + /** * Gets comparator for grouping on combine or reduce operation. * * @return Comparator. http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/io/PartiallyOffheapRawComparatorEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/io/PartiallyOffheapRawComparatorEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/io/PartiallyOffheapRawComparatorEx.java new file mode 100644 index 0000000..157609e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/io/PartiallyOffheapRawComparatorEx.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.io; + +/** + * Special version of raw comparator allowing direct access to the underlying memory. + */ +public interface PartiallyOffheapRawComparatorEx<T> { + /** + * Perform compare. + * + * @param val1 First value. + * @param val2Ptr Pointer to the second value data. + * @param val2Len Length of the second value data. + * @return Result. + */ + int compare(T val1, long val2Ptr, int val2Len); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/PartiallyRawComparator.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/PartiallyRawComparator.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/PartiallyRawComparator.java new file mode 100644 index 0000000..b9a4505 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/PartiallyRawComparator.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.io; + +/** + * Partially raw comparator. Compares one deserialized value with serialized value. + */ +public interface PartiallyRawComparator<T> { + /** + * Do compare. + * + * @param val1 First value (deserialized). + * @param val2Buf Second value (serialized). + * @return A negative integer, zero, or a positive integer as this object is less than, equal to, or greater + * than the specified object. + */ + int compare(T val1, RawMemory val2Buf); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/RawMemory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/RawMemory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/RawMemory.java new file mode 100644 index 0000000..8dcaf83 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/RawMemory.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.io; + +/** + * Memory abstraction for raw comparison. + */ +public interface RawMemory { + /** + * Get byte value at the given index. + * + * @param idx Index. + * @return Value. + */ + byte get(int idx); + + /** + * Get short value at the given index. + * + * @param idx Index. + * @return Value. + */ + short getShort(int idx); + + /** + * Get char value at the given index. + * + * @param idx Index. + * @return Value. + */ + char getChar(int idx); + + /** + * Get int value at the given index. + * + * @param idx Index. + * @return Value. + */ + int getInt(int idx); + + /** + * Get long value at the given index. + * + * @param idx Index. + * @return Value. + */ + long getLong(int idx); + + /** + * Get float value at the given index. + * + * @param idx Index. + * @return Value. + */ + float getFloat(int idx); + + /** + * Get double value at the given index. + * + * @param idx Index. + * @return Value. + */ + double getDouble(int idx); + + /** + * Get length. + * + * @return Length. + */ + int length(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java new file mode 100644 index 0000000..a2bc3d4 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.io; + +import com.google.common.primitives.Longs; +import com.google.common.primitives.UnsignedBytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; +import org.apache.ignite.internal.processors.hadoop.io.OffheapRawMemory; +import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx; +import org.apache.ignite.internal.util.GridUnsafe; + +/** + * Partial raw comparator for {@link Text} data type. + * <p> + * Implementation is borrowed from {@code org.apache.hadoop.io.FastByteComparisons} and adopted to Ignite + * infrastructure. + */ +public class TextPartiallyRawComparator implements PartiallyRawComparator<Text>, PartiallyOffheapRawComparatorEx<Text> { + /** {@inheritDoc} */ + @Override public int compare(Text val1, RawMemory val2Buf) { + if (val2Buf instanceof OffheapRawMemory) { + OffheapRawMemory val2Buf0 = (OffheapRawMemory)val2Buf; + + return compare(val1, val2Buf0.pointer(), val2Buf0.length()); + } + else + throw new UnsupportedOperationException("Text can be compared only with offheap memory."); + } + + /** {@inheritDoc} */ + @Override public int compare(Text val1, long val2Ptr, int val2Len) { + int len2 = WritableUtils.decodeVIntSize(GridUnsafe.getByte(val2Ptr)); + + return compareBytes(val1.getBytes(), val1.getLength(), val2Ptr + len2, val2Len - len2); + } + + /** + * Internal comparison routine. + * + * @param buf1 Bytes 1. + * @param len1 Length 1. + * @param ptr2 Pointer 2. + * @param len2 Length 2. + * @return Result. + */ + @SuppressWarnings("SuspiciousNameCombination") + private static int compareBytes(byte[] buf1, int len1, long ptr2, int len2) { + int minLength = Math.min(len1, len2); + + int minWords = minLength / Longs.BYTES; + + for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) { + long lw = GridUnsafe.getLong(buf1, GridUnsafe.BYTE_ARR_OFF + i); + long rw = GridUnsafe.getLong(ptr2 + i); + + long diff = lw ^ rw; + + if (diff != 0) { + if (GridUnsafe.BIG_ENDIAN) + return (lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE) ? -1 : 1; + + // Use binary search + int n = 0; + int y; + int x = (int) diff; + + if (x == 0) { + x = (int) (diff >>> 32); + + n = 32; + } + + y = x << 16; + + if (y == 0) + n += 16; + else + x = y; + + y = x << 8; + + if (y == 0) + n += 8; + + return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL)); + } + } + + // The epilogue to cover the last (minLength % 8) elements. + for (int i = minWords * Longs.BYTES; i < minLength; i++) { + int res = UnsignedBytes.compare(buf1[i], GridUnsafe.getByte(ptr2 + i)); + + if (res != 0) + return res; + } + + return len1 - len2; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/package-info.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/package-info.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/package-info.java new file mode 100644 index 0000000..0d1f7b9 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains <b>Hadoop Accelerator</b> API for input-output operations. + */ +package org.apache.ignite.hadoop.io; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2DelegatingPartiallyOffheapRawComparator.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2DelegatingPartiallyOffheapRawComparator.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2DelegatingPartiallyOffheapRawComparator.java new file mode 100644 index 0000000..e6d369e --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2DelegatingPartiallyOffheapRawComparator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.v2; + +import org.apache.ignite.hadoop.io.PartiallyRawComparator; +import org.apache.ignite.internal.processors.hadoop.io.OffheapRawMemory; +import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx; + +/** + * Delegating partial raw comparator. + */ +public class HadoopV2DelegatingPartiallyOffheapRawComparator<T> implements PartiallyOffheapRawComparatorEx<T> { + /** Target comparator. */ + private final PartiallyRawComparator<T> target; + + /** Memory. */ + private OffheapRawMemory mem; + + /** + * Constructor. + * + * @param target Target. + */ + public HadoopV2DelegatingPartiallyOffheapRawComparator(PartiallyRawComparator<T> target) { + assert target != null; + + this.target = target; + } + + /** {@inheritDoc} */ + @Override public int compare(T val1, long val2Ptr, int val2Len) { + if (mem == null) + mem = new OffheapRawMemory(val2Ptr, val2Len); + else + mem.update(val2Ptr, val2Len); + + return target.compare(val1, mem); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java index d444f2b..42bbec5 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java @@ -38,13 +38,16 @@ import org.apache.hadoop.mapreduce.JobSubmissionFiles; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.hadoop.io.PartiallyRawComparator; import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit; import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopJob; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty; import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner; import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper; @@ -62,6 +65,7 @@ import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1MapTask; import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Partitioner; import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1ReduceTask; import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1SetupTask; +import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; @@ -421,11 +425,28 @@ public class HadoopV2TaskContext extends HadoopTaskContext { } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public Comparator<Object> sortComparator() { return (Comparator<Object>)jobCtx.getSortComparator(); } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public PartiallyOffheapRawComparatorEx<Object> partialRawSortComparator() { + Class cls = jobCtx.getJobConf().getClass(HadoopJobProperty.JOB_PARTIAL_RAW_COMPARATOR.propertyName(), null); + + if (cls == null) + return null; + + Object res = ReflectionUtils.newInstance(cls, jobConf()); + + if (res instanceof PartiallyOffheapRawComparatorEx) + return (PartiallyOffheapRawComparatorEx)res; + else + return new HadoopV2DelegatingPartiallyOffheapRawComparator<>((PartiallyRawComparator)res); + } + + /** {@inheritDoc} */ @Override public Comparator<Object> groupComparator() { Comparator<?> res; http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/io/OffheapRawMemory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/io/OffheapRawMemory.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/io/OffheapRawMemory.java new file mode 100644 index 0000000..564f92c --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/io/OffheapRawMemory.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.io; + +import org.apache.ignite.hadoop.io.RawMemory; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Offheap-based memory. + */ +public class OffheapRawMemory implements RawMemory { + /** Pointer. */ + private long ptr; + + /** Length. */ + private int len; + + /** + * Constructor. + * + * @param ptr Pointer. + * @param len Length. + */ + public OffheapRawMemory(long ptr, int len) { + update(ptr, len); + } + + /** {@inheritDoc} */ + @Override public byte get(int idx) { + ensure(idx, 1); + + return GridUnsafe.getByte(ptr + idx); + } + + /** {@inheritDoc} */ + @Override public short getShort(int idx) { + ensure(idx, 2); + + return GridUnsafe.getShort(ptr + idx); + } + + /** {@inheritDoc} */ + @Override public char getChar(int idx) { + ensure(idx, 2); + + return GridUnsafe.getChar(ptr + idx); + } + + /** {@inheritDoc} */ + @Override public int getInt(int idx) { + ensure(idx, 4); + + return GridUnsafe.getInt(ptr + idx); + } + + /** {@inheritDoc} */ + @Override public long getLong(int idx) { + ensure(idx, 8); + + return GridUnsafe.getLong(ptr + idx); + } + + /** {@inheritDoc} */ + @Override public float getFloat(int idx) { + ensure(idx, 4); + + return GridUnsafe.getFloat(ptr + idx); + } + + /** {@inheritDoc} */ + @Override public double getDouble(int idx) { + ensure(idx, 8); + + return GridUnsafe.getDouble(ptr + idx); + } + + /** {@inheritDoc} */ + @Override public int length() { + return len; + } + + /** + * @return Raw pointer. + */ + public long pointer() { + return ptr; + } + + /** + * Update pointer and length. + * + * @param ptr Pointer. + * @param len Length. + */ + public void update(long ptr, int len) { + this.ptr = ptr; + this.len = len; + } + + /** + * Ensure that the given number of bytes are available for read. Throw an exception otherwise. + * + * @param idx Index. + * @param cnt Count. + */ + private void ensure(int idx, int cnt) { + if (idx < 0 || idx + cnt - 1 >= len) + throw new IndexOutOfBoundsException("Illegal index [len=" + len + ", idx=" + idx + ']'); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(OffheapRawMemory.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java index 7db88bc..f300a18 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.GridRandom; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; @@ -280,6 +281,9 @@ public class HadoopSkipList extends HadoopMultimapBase { private final Comparator<Object> cmp; /** */ + private final PartiallyOffheapRawComparatorEx<Object> partialRawCmp; + + /** */ private final Random rnd = new GridRandom(); /** */ @@ -298,6 +302,7 @@ public class HadoopSkipList extends HadoopMultimapBase { keyReader = new Reader(keySer); cmp = ctx.sortComparator(); + partialRawCmp = ctx.partialRawSortComparator(); } /** {@inheritDoc} */ @@ -475,7 +480,14 @@ public class HadoopSkipList extends HadoopMultimapBase { private int cmp(Object key, long meta) { assert meta != 0; - return cmp.compare(key, keyReader.readKey(meta)); + if (partialRawCmp != null) { + long keyPtr = key(meta); + int keySize = keySize(keyPtr); + + return partialRawCmp.compare(key, keyPtr + 4, keySize); + } + else + return cmp.compare(key, keyReader.readKey(meta)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java index 0cc9564..a016506 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java @@ -41,8 +41,10 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.HadoopConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.io.TextPartiallyRawComparator; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty; import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo; @@ -161,6 +163,11 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest { jobConf.set("mapred.min.split.size", String.valueOf(splitSize)); jobConf.set("mapred.max.split.size", String.valueOf(splitSize)); + jobConf.setBoolean(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), true); + + jobConf.set(HadoopJobProperty.JOB_PARTIAL_RAW_COMPARATOR.propertyName(), + TextPartiallyRawComparator.class.getName()); + Job job = setupConfig(jobConf); HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java index 9d1fd4f..1f8978d 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopWritableSerialization; +import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; @@ -84,6 +85,11 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override public PartiallyOffheapRawComparatorEx<Object> partialRawSortComparator() { + return null; + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public Comparator<Object> groupComparator() { return ComparableComparator.getInstance();
