This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e8df4fb  [hotfix][table-planner] Port TimeIndicatorTypeInfo to 
table-common
e8df4fb is described below

commit e8df4fb1e50ffc353e807eaf1aa3ee106b16559a
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Tue May 7 18:09:37 2019 +0200

    [hotfix][table-planner] Port TimeIndicatorTypeInfo to table-common
    
    This closes #8363.
---
 .../table/typeutils/TimeIndicatorTypeInfo.java     | 77 ++++++++++++++++++++++
 .../table/typeutils/TimeIndicatorTypeInfo.scala    | 59 -----------------
 2 files changed, 77 insertions(+), 59 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java
new file mode 100644
index 0000000..e46bf3f
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampComparator;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer;
+
+import java.sql.Timestamp;
+
+/**
+ * Type information for indicating event or processing time. However, it 
behaves like a
+ * regular SQL timestamp but is serialized as Long.
+ */
+@Internal
+public class TimeIndicatorTypeInfo extends SqlTimeTypeInfo<Timestamp> {
+
+       private final boolean isEventTime;
+
+       public static final int ROWTIME_STREAM_MARKER = -1;
+       public static final int PROCTIME_STREAM_MARKER = -2;
+
+       public static final int ROWTIME_BATCH_MARKER = -3;
+       public static final int PROCTIME_BATCH_MARKER = -4;
+
+       public static final TimeIndicatorTypeInfo ROWTIME_INDICATOR = new 
TimeIndicatorTypeInfo(true);
+       public static final TimeIndicatorTypeInfo PROCTIME_INDICATOR = new 
TimeIndicatorTypeInfo(false);
+
+       @SuppressWarnings("unchecked")
+       protected TimeIndicatorTypeInfo(boolean isEventTime) {
+               super(Timestamp.class, SqlTimestampSerializer.INSTANCE, (Class) 
SqlTimestampComparator.class);
+               this.isEventTime = isEventTime;
+       }
+
+       // this replaces the effective serializer by a LongSerializer
+       // it is a hacky but efficient solution to keep the object creation 
overhead low but still
+       // be compatible with the corresponding SqlTimestampTypeInfo
+       @Override
+       @SuppressWarnings("unchecked")
+       public TypeSerializer<Timestamp> createSerializer(ExecutionConfig 
executionConfig) {
+               return (TypeSerializer) LongSerializer.INSTANCE;
+       }
+
+       public boolean isEventTime() {
+               return isEventTime;
+       }
+
+       @Override
+       public String toString() {
+               if (isEventTime) {
+                       return "TimeIndicatorTypeInfo(rowtime)";
+               } else {
+                       return "TimeIndicatorTypeInfo(proctime)";
+               }
+       }
+}
+
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
deleted file mode 100644
index ad82d52..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils
-
-import java.sql.Timestamp
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
-import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
-import org.apache.flink.api.common.typeutils.base.{LongSerializer, 
SqlTimestampComparator, SqlTimestampSerializer}
-
-/**
-  * Type information for indicating event or processing time. However, it 
behaves like a
-  * regular SQL timestamp but is serialized as Long.
-  */
-class TimeIndicatorTypeInfo(val isEventTime: Boolean)
-  extends SqlTimeTypeInfo[Timestamp](
-    classOf[Timestamp],
-    SqlTimestampSerializer.INSTANCE,
-    
classOf[SqlTimestampComparator].asInstanceOf[Class[TypeComparator[Timestamp]]]) 
{
-
-  // this replaces the effective serializer by a LongSerializer
-  // it is a hacky but efficient solution to keep the object creation overhead 
low but still
-  // be compatible with the corresponding SqlTimestampTypeInfo
-  override def createSerializer(executionConfig: ExecutionConfig): 
TypeSerializer[Timestamp] =
-    LongSerializer.INSTANCE.asInstanceOf[TypeSerializer[Timestamp]]
-
-  override def toString: String =
-    s"TimeIndicatorTypeInfo(${if (isEventTime) "rowtime" else "proctime" })"
-}
-
-object TimeIndicatorTypeInfo {
-
-  val ROWTIME_STREAM_MARKER: Int = -1
-  val PROCTIME_STREAM_MARKER: Int = -2
-
-  val ROWTIME_BATCH_MARKER: Int = -3
-  val PROCTIME_BATCH_MARKER: Int = -4
-
-  val ROWTIME_INDICATOR = new TimeIndicatorTypeInfo(true)
-  val PROCTIME_INDICATOR = new TimeIndicatorTypeInfo(false)
-
-}

Reply via email to