[
https://issues.apache.org/jira/browse/FLINK-3786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15262348#comment-15262348
]
ASF GitHub Bot commented on FLINK-3786:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1928#discussion_r61449670
--- Diff:
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+@Internal
+public final class BigDecSerializer extends
TypeSerializerSingleton<BigDecimal> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final BigDecSerializer INSTANCE = new BigDecSerializer();
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public BigDecimal createInstance() {
+ return BigDecimal.ZERO;
+ }
+
+ @Override
+ public BigDecimal copy(BigDecimal from) {
+ return from;
+ }
+
+ @Override
+ public BigDecimal copy(BigDecimal from, BigDecimal reuse) {
+ return from;
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(BigDecimal record, DataOutputView target) throws
IOException {
+ // null value support
+ if (record == null) {
+ BigIntSerializer.writeBigInteger(null, target);
+ return;
+ }
+ // fast paths for 0, 1, 10
+ else if (record == BigDecimal.ZERO) {
+ BigIntSerializer.writeBigInteger(BigInteger.ZERO,
target);
+ target.writeInt(0);
+ return;
+ }
+ else if (record == BigDecimal.ONE) {
+ BigIntSerializer.writeBigInteger(BigInteger.ONE,
target);
+ target.writeInt(0);
+ return;
+ }
+ else if (record == BigDecimal.TEN) {
+ BigIntSerializer.writeBigInteger(BigInteger.TEN,
target);
+ target.writeInt(0);
+ return;
+ }
+ // default
+ BigIntSerializer.writeBigInteger(record.unscaledValue(),
target);
+ target.writeInt(record.scale());
+ }
+
+ @Override
+ public BigDecimal deserialize(DataInputView source) throws IOException {
+ return readBigDecimal(source);
+ }
+
+ @Override
+ public BigDecimal deserialize(BigDecimal reuse, DataInputView source)
throws IOException {
+ return readBigDecimal(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ final boolean isNull = BigIntSerializer.copyBigInteger(source,
target);
+ if (!isNull) {
+ final int scale = source.readInt();
+ target.writeInt(scale);
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof BigDecSerializer;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Static Helpers for BigInteger Serialization
+ //
--------------------------------------------------------------------------------------------
+
+ public static BigDecimal readBigDecimal(DataInputView source) throws
IOException {
+ final BigInteger unscaledValue =
BigIntSerializer.readBigInteger(source);
+ if (unscaledValue == null) {
+ return null;
+ }
+ final int scale = source.readInt();
+ // fast-path for 0, 1, 10
+ if (scale == 0) {
+ if (unscaledValue == BigInteger.ZERO) {
--- End diff --
Check with equals
> Add BigDecimal and BigInteger as Basic types
> --------------------------------------------
>
> Key: FLINK-3786
> URL: https://issues.apache.org/jira/browse/FLINK-3786
> Project: Flink
> Issue Type: New Feature
> Components: Core
> Reporter: Timo Walther
> Assignee: Timo Walther
>
> We already had the discussion on the mailing list some months ago about
> adding BigDecimal and BigInteger as basic types.
> Especially for business or scientific applications it
> makes sense to support the BigInteger and BigDecimal types natively. In
> my opinion they are as important as Date or Void and should be added as
> BasicTypes. The Table API would also benefit from it.
> http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%[email protected]%3E
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)