[
https://issues.apache.org/jira/browse/FLINK-3786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15262285#comment-15262285
]
ASF GitHub Bot commented on FLINK-3786:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1928#discussion_r61443708
--- Diff:
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecComparator.java
---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+@Internal
+public final class BigDecComparator extends
BasicTypeComparator<BigDecimal> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final long SMALLEST_MAGNITUDE = Integer.MAX_VALUE;
+
+ private static final long LARGEST_MAGNITUDE = ((long)
Integer.MIN_VALUE) - Integer.MAX_VALUE + 1;
+
+ public BigDecComparator(boolean ascending) {
+ super(ascending);
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView
secondSource) throws IOException {
+ BigDecimal bd1 = BigDecSerializer.readBigDecimal(firstSource);
+ BigDecimal bd2 = BigDecSerializer.readBigDecimal(secondSource);
+ int comp = bd1.compareTo(bd2);
+ return ascendingComparison ? comp : -comp;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return true;
+ }
+
+ /**
+ * Adds a normalized key containing a normalized order of magnitude of
the given record.
+ * 2 bits determine the sign (negative, zero, positive), 33 bits
determine the magnitude.
+ * This method adds at most 5 bytes that contain information.
+ */
+ @Override
+ public void putNormalizedKey(BigDecimal record, MemorySegment target,
int offset, int len) {
+ final long signum = record.signum();
+
+ // order of magnitude
+ // smallest:
+ // scale = Integer.MAX, precision = 1 => SMALLEST_MAGNITUDE
+ // largest:
+ // scale = Integer.MIN, precision = Integer.MAX =>
LARGEST_MAGNITUDE
+ final long mag = ((long) record.scale()) - ((long)
record.precision()) + 1;
+
+ // normalize value range: from 0 to (SMALLEST_MAGNITUDE +
-1*LARGEST_MAGNITUDE)
+ final long normMag = -1L * LARGEST_MAGNITUDE + mag;
+
+ // normalize value range dependent on sign:
+ // 0 to (SMALLEST_MAGNITUDE + -1*LARGEST_MAGNITUDE)
+ // OR (SMALLEST_MAGNITUDE + -1*LARGEST_MAGNITUDE) to 0
+ // --> uses at most 33 bit (5 least-significant bytes)
+ long signNormMag = signum < 0 ? normMag : (SMALLEST_MAGNITUDE +
-1L * LARGEST_MAGNITUDE - normMag);
+
+ // zero has no magnitude
+ // set 34th bit to flag zero
+ if (signum == 0) {
+ signNormMag = 0L;
+ signNormMag |= (1L << 34);
+ }
+ // set 35th bit to flag positive sign
+ else if (signum > 0) {
+ signNormMag |= (1L << 35);
+ }
+
+ // add 5 least-significant bytes that contain value to target
+ for (int i = 0; i < 5 && len > 0; i++, len--) {
+ final byte b = (byte) (signNormMag >>> (8 * (4 - i)));
+ target.put(offset++, b);
+ }
+
+ // fill remaining bytes with 0
+ for (; len > 0; len--) {
--- End diff --
Padding can be removed if `getNormalizeKeyLen()` returns 5.
> Add BigDecimal and BigInteger as Basic types
> --------------------------------------------
>
> Key: FLINK-3786
> URL: https://issues.apache.org/jira/browse/FLINK-3786
> Project: Flink
> Issue Type: New Feature
> Components: Core
> Reporter: Timo Walther
> Assignee: Timo Walther
>
> We already had the discussion on the mailing list some months ago about
> adding BigDecimal and BigInteger as basic types.
> Especially for business or scientific applications it
> makes sense to support the BigInteger and BigDecimal types natively. In
> my opinion they are as important as Date or Void and should be added as
> BasicTypes. The Table API would also benefit from it.
> http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%[email protected]%3E
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)