[FLINK-3366] Rename @Experimental annotation to @PublicEvolving

This closes #1599


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/572855da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/572855da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/572855da

Branch: refs/heads/tableOnCalcite
Commit: 572855daad452eab169bc2ca27f9f1e4476df656
Parents: 59b237b
Author: Fabian Hueske <[email protected]>
Authored: Mon Feb 8 14:14:01 2016 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Wed Feb 10 11:51:26 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/annotation/Experimental.java   | 35 -----------
 .../apache/flink/annotation/PublicEvolving.java | 40 ++++++++++++
 .../flink/api/common/ExecutionConfig.java       | 16 ++---
 .../flink/api/common/JobExecutionResult.java    |  4 +-
 .../functions/IterationRuntimeContext.java      |  4 +-
 .../api/common/functions/RuntimeContext.java    | 22 +++----
 .../util/AbstractRuntimeUDFContext.java         | 12 ++--
 .../common/io/statistics/BaseStatistics.java    | 14 ++---
 .../api/common/typeinfo/BasicArrayTypeInfo.java | 22 +++----
 .../api/common/typeinfo/BasicTypeInfo.java      | 22 +++----
 .../api/common/typeinfo/NothingTypeInfo.java    | 16 ++---
 .../common/typeinfo/PrimitiveArrayTypeInfo.java | 24 +++----
 .../api/common/typeinfo/TypeInformation.java    | 20 +++---
 .../api/common/typeutils/CompositeType.java     | 36 +++++------
 .../flink/api/java/typeutils/AvroTypeInfo.java  |  4 +-
 .../api/java/typeutils/EitherTypeInfo.java      | 18 +++---
 .../flink/api/java/typeutils/EnumTypeInfo.java  | 20 +++---
 .../api/java/typeutils/GenericTypeInfo.java     | 20 +++---
 .../api/java/typeutils/ObjectArrayTypeInfo.java | 22 +++----
 .../flink/api/java/typeutils/PojoTypeInfo.java  | 28 ++++-----
 .../flink/api/java/typeutils/TupleTypeInfo.java | 16 ++---
 .../flink/api/java/typeutils/TypeExtractor.java | 66 ++++++++++----------
 .../flink/api/java/typeutils/ValueTypeInfo.java | 24 +++----
 .../api/java/typeutils/WritableTypeInfo.java    | 22 +++----
 .../java/org/apache/flink/api/java/DataSet.java |  6 +-
 .../flink/api/java/ExecutionEnvironment.java    | 32 +++++-----
 .../apache/flink/api/java/LocalEnvironment.java |  4 +-
 .../flink/api/java/RemoteEnvironment.java       |  4 +-
 .../flink/api/java/functions/FirstReducer.java  |  1 +
 .../api/java/functions/FunctionAnnotation.java  | 10 +--
 .../org/apache/flink/api/java/io/CsvReader.java |  4 +-
 .../flink/api/java/operators/CrossOperator.java |  4 +-
 .../flink/api/java/operators/DataSink.java      |  6 +-
 .../flink/api/java/operators/DataSource.java    |  4 +-
 .../api/java/operators/DeltaIteration.java      |  6 +-
 .../api/java/operators/IterativeDataSet.java    |  8 +--
 .../flink/api/java/operators/JoinOperator.java  |  4 +-
 .../api/java/operators/ProjectOperator.java     |  4 +-
 .../flink/api/java/utils/DataSetUtils.java      |  4 +-
 .../flink/api/java/utils/ParameterTool.java     |  4 +-
 .../org/apache/flink/api/scala/DataSet.scala    |  8 +--
 .../flink/api/scala/ExecutionEnvironment.scala  | 34 +++++-----
 .../api/scala/typeutils/CaseClassTypeInfo.scala | 16 ++---
 .../api/scala/typeutils/EitherTypeInfo.scala    | 18 +++---
 .../api/scala/typeutils/EnumValueTypeInfo.scala | 20 +++---
 .../api/scala/typeutils/OptionTypeInfo.scala    | 18 +++---
 .../scala/typeutils/ScalaNothingTypeInfo.scala  | 16 ++---
 .../scala/typeutils/TraversableTypeInfo.scala   | 18 +++---
 .../flink/api/scala/typeutils/TryTypeInfo.scala | 18 +++---
 .../api/scala/typeutils/UnitTypeInfo.scala      | 16 ++---
 .../apache/flink/api/scala/utils/package.scala  |  5 +-
 .../api/datastream/AllWindowedStream.java       |  8 +--
 .../api/datastream/CoGroupedStreams.java        |  8 +--
 .../api/datastream/ConnectedStreams.java        |  4 +-
 .../streaming/api/datastream/DataStream.java    | 46 +++++++-------
 .../api/datastream/DataStreamSink.java          |  6 +-
 .../api/datastream/IterativeStream.java         |  4 +-
 .../streaming/api/datastream/JoinedStreams.java | 10 +--
 .../streaming/api/datastream/KeyedStream.java   |  6 +-
 .../datastream/SingleOutputStreamOperator.java  | 20 +++---
 .../streaming/api/datastream/SplitStream.java   |  4 +-
 .../api/datastream/WindowedStream.java          |  8 +--
 .../api/environment/CheckpointConfig.java       |  6 +-
 .../environment/StreamExecutionEnvironment.java | 36 +++++------
 .../source/EventTimeSourceFunction.java         |  4 +-
 .../api/functions/source/SourceFunction.java    |  6 +-
 .../streaming/api/scala/AllWindowedStream.scala |  6 +-
 .../streaming/api/scala/CoGroupedStreams.scala  | 10 +--
 .../flink/streaming/api/scala/DataStream.scala  | 57 ++++++++---------
 .../streaming/api/scala/JoinedStreams.scala     |  8 +--
 .../flink/streaming/api/scala/KeyedStream.scala |  4 +-
 .../api/scala/StreamExecutionEnvironment.scala  | 28 ++++-----
 .../streaming/api/scala/WindowedStream.scala    |  6 +-
 73 files changed, 561 insertions(+), 553 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java
----------------------------------------------------------------------
diff --git 
a/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java 
b/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java
deleted file mode 100644
index 3f4a661..0000000
--- 
a/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.flink.annotation;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Target;
-
-/**
- * Interface to mark methods within stable, public APIs as experimental.
- * It also allows to mark types explicitly as experimental
- *
- * An experimental API might change between minor releases.
- */
-@Documented
-@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, 
ElementType.CONSTRUCTOR })
-@Public
-public @interface Experimental {
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-annotations/src/main/java/org/apache/flink/annotation/PublicEvolving.java
----------------------------------------------------------------------
diff --git 
a/flink-annotations/src/main/java/org/apache/flink/annotation/PublicEvolving.java
 
b/flink-annotations/src/main/java/org/apache/flink/annotation/PublicEvolving.java
new file mode 100644
index 0000000..289d891
--- /dev/null
+++ 
b/flink-annotations/src/main/java/org/apache/flink/annotation/PublicEvolving.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.flink.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to mark classes and methods for public use but with evolving 
interfaces.
+ *
+ * Classes and methods with this annotation are intended for public use and 
have stable behavior.
+ * However, their interfaces and signatures are not considered to be stable 
and might be changed
+ * across versions.
+ *
+ * This annotation also excludes methods and classes with evolving interfaces 
/ signatures
+ * within classes annotated with {@link Public}.
+ *
+ */
+@Documented
+@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, 
ElementType.CONSTRUCTOR })
+@Public
+public @interface PublicEvolving {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 59ad5dd..8d5211b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.common;
 
 import com.esotericsoftware.kryo.Serializer;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 
 import java.io.Serializable;
@@ -153,7 +153,7 @@ public class ExecutionConfig implements Serializable {
         *
         * @param interval The interval between watermarks in milliseconds.
         */
-       @Experimental
+       @PublicEvolving
        public ExecutionConfig setAutoWatermarkInterval(long interval) {
                enableTimestamps();
                this.autoWatermarkInterval = interval;
@@ -171,7 +171,7 @@ public class ExecutionConfig implements Serializable {
         *
         * @see #setAutoWatermarkInterval(long)
         */
-       @Experimental
+       @PublicEvolving
        public ExecutionConfig enableTimestamps() {
                this.timestampsEnabled = true;
                return this;
@@ -182,7 +182,7 @@ public class ExecutionConfig implements Serializable {
         *
         * @see #enableTimestamps()
         */
-       @Experimental
+       @PublicEvolving
        public ExecutionConfig disableTimestamps() {
                this.timestampsEnabled = false;
                return this;
@@ -193,7 +193,7 @@ public class ExecutionConfig implements Serializable {
         *
         * @see #enableTimestamps()
         */
-       @Experimental
+       @PublicEvolving
        public boolean areTimestampsEnabled() {
                return timestampsEnabled;
        }
@@ -203,7 +203,7 @@ public class ExecutionConfig implements Serializable {
         *
         * @see #setAutoWatermarkInterval(long)
         */
-       @Experimental
+       @PublicEvolving
        public long getAutoWatermarkInterval()  {
                return this.autoWatermarkInterval;
        }
@@ -385,7 +385,7 @@ public class ExecutionConfig implements Serializable {
         * 
         * @param codeAnalysisMode see {@link CodeAnalysisMode}
         */
-       @Experimental
+       @PublicEvolving
        public void setCodeAnalysisMode(CodeAnalysisMode codeAnalysisMode) {
                this.codeAnalysisMode = codeAnalysisMode;
        }
@@ -393,7 +393,7 @@ public class ExecutionConfig implements Serializable {
        /**
         * Returns the {@link CodeAnalysisMode} of the program.
         */
-       @Experimental
+       @PublicEvolving
        public CodeAnalysisMode getCodeAnalysisMode() {
                return codeAnalysisMode;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java 
b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
index 1fff4da..7962fce 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.common;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 
 import java.util.Collections;
@@ -104,7 +104,7 @@ public class JobExecutionResult extends JobSubmissionResult 
{
         * @throws java.lang.ClassCastException Thrown, if the accumulator was 
not aggregating a {@link java.lang.Integer}
         */
        @Deprecated
-       @Experimental
+       @PublicEvolving
        public Integer getIntCounterResult(String accumulatorName) {
                Object result = this.accumulatorResults.get(accumulatorName);
                if (result == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
index 5dc4ec8..0019c32 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.common.functions;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.types.Value;
@@ -33,7 +33,7 @@ public interface IterationRuntimeContext extends 
RuntimeContext {
         */
        int getSuperstepNumber();
 
-       @Experimental
+       @PublicEvolving
        <T extends Aggregator<?>> T getIterationAggregator(String name);
        
        <T extends Value> T getPreviousIterationAggregate(String name);

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index c96ecde..86ca789 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -125,31 +125,31 @@ public interface RuntimeContext {
         * @deprecated Use getAccumulator(..) to obtain the value of an 
accumulator.
         */
        @Deprecated
-       @Experimental
+       @PublicEvolving
        Map<String, Accumulator<?, ?>> getAllAccumulators();
 
        /**
         * Convenience function to create a counter object for integers.
         */
-       @Experimental
+       @PublicEvolving
        IntCounter getIntCounter(String name);
 
        /**
         * Convenience function to create a counter object for longs.
         */
-       @Experimental
+       @PublicEvolving
        LongCounter getLongCounter(String name);
 
        /**
         * Convenience function to create a counter object for doubles.
         */
-       @Experimental
+       @PublicEvolving
        DoubleCounter getDoubleCounter(String name);
 
        /**
         * Convenience function to create a counter object for histograms.
         */
-       @Experimental
+       @PublicEvolving
        Histogram getHistogram(String name);
        
        // 
--------------------------------------------------------------------------------------------
@@ -239,7 +239,7 @@ public interface RuntimeContext {
         * @throws UnsupportedOperationException Thrown, if no partitioned 
state is available for the
         *                                       function (function is not part 
of a KeyedStream).
         */
-       @Experimental
+       @PublicEvolving
        <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties);
 
        /**
@@ -283,7 +283,7 @@ public interface RuntimeContext {
         * @throws UnsupportedOperationException Thrown, if no partitioned 
state is available for the
         *                                       function (function is not part 
os a KeyedStream).
         */
-       @Experimental
+       @PublicEvolving
        <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);
 
        /**
@@ -323,7 +323,7 @@ public interface RuntimeContext {
         * @throws UnsupportedOperationException Thrown, if no partitioned 
state is available for the
         *                                       function (function is not part 
of a KeyedStream).
         */
-       @Experimental
+       @PublicEvolving
        <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> 
stateProperties);
        
        /**
@@ -383,7 +383,7 @@ public interface RuntimeContext {
         * @deprecated Use the more expressive {@link 
#getState(ValueStateDescriptor)} instead.
         */
        @Deprecated
-       @Experimental
+       @PublicEvolving
        <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, 
S defaultState);
 
        /**
@@ -436,6 +436,6 @@ public interface RuntimeContext {
         * @deprecated Use the more expressive {@link 
#getState(ValueStateDescriptor)} instead.
         */
        @Deprecated
-       @Experimental
+       @PublicEvolving
        <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> 
stateType, S defaultState);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index a8078fb..a300f38 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import java.util.concurrent.Future;
 
 import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -176,21 +176,21 @@ public abstract class AbstractRuntimeUDFContext 
implements RuntimeContext {
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
                throw new UnsupportedOperationException(
                                "This state is only accessible by functions 
executed on a KeyedStream");
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
                throw new UnsupportedOperationException(
                                "This state is only accessible by functions 
executed on a KeyedStream");
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> 
stateProperties) {
                throw new UnsupportedOperationException(
                                "This state is only accessible by functions 
executed on a KeyedStream");
@@ -198,7 +198,7 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
 
        @Override
        @Deprecated
-       @Experimental
+       @PublicEvolving
        public <S> OperatorState<S> getKeyValueState(String name, Class<S> 
stateType, S defaultState) {
                throw new UnsupportedOperationException(
                                "This state is only accessible by functions 
executed on a KeyedStream");
@@ -206,7 +206,7 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
 
        @Override
        @Deprecated
-       @Experimental
+       @PublicEvolving
        public <S> OperatorState<S> getKeyValueState(String name, 
TypeInformation<S> stateType, S defaultState) {
                throw new UnsupportedOperationException(
                                "This state is only accessible by functions 
executed on a KeyedStream");

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/common/io/statistics/BaseStatistics.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/statistics/BaseStatistics.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/statistics/BaseStatistics.java
index cb194ae..9411355 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/statistics/BaseStatistics.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/statistics/BaseStatistics.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.io.statistics;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 
 /**
@@ -31,19 +31,19 @@ public interface BaseStatistics {
        /**
         * Constant indicating that the input size is unknown.
         */
-       @Experimental
+       @PublicEvolving
        public static final long SIZE_UNKNOWN = -1;
        
        /**
         * Constant indicating that the number of records is unknown;
         */
-       @Experimental
+       @PublicEvolving
        public static final long NUM_RECORDS_UNKNOWN = -1;
        
        /**
         * Constant indicating that average record width is unknown.
         */
-       @Experimental
+       @PublicEvolving
        public static final float AVG_RECORD_BYTES_UNKNOWN = -1.0f;
        
        // 
--------------------------------------------------------------------------------------------
@@ -53,7 +53,7 @@ public interface BaseStatistics {
         *   
         * @return The total size of the input, in bytes.
         */
-       @Experimental
+       @PublicEvolving
        public long getTotalInputSize();
        
        /**
@@ -61,7 +61,7 @@ public interface BaseStatistics {
         * 
         * @return The number of records in the input.
         */
-       @Experimental
+       @PublicEvolving
        public long getNumberOfRecords();
        
        /**
@@ -69,6 +69,6 @@ public interface BaseStatistics {
         * 
         * @return The average width of a record in bytes.
         */
-       @Experimental
+       @PublicEvolving
        public float getAverageRecordWidth();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
index 995c2fe..2c61fb2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import java.util.Objects;
 
 import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -61,54 +61,54 @@ public final class BasicArrayTypeInfo<T, C> extends 
TypeInformation<T> {
        // 
--------------------------------------------------------------------------------------------
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isBasicType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isTupleType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getArity() {
                return 1;
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public int getTotalFields() {
                return 1;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public Class<T> getTypeClass() {
                return this.arrayClass;
        }
 
-       @Experimental
+       @PublicEvolving
        public Class<C> getComponentTypeClass() {
                return this.componentInfo.getTypeClass();
        }
 
-       @Experimental
+       @PublicEvolving
        public TypeInformation<C> getComponentInfo() {
                return componentInfo;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isKeyType() {
                return false;
        }
 
        @Override
        @SuppressWarnings("unchecked")
-       @Experimental
+       @PublicEvolving
        public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                // special case the string array
                if (componentInfo.getTypeClass().equals(String.class)) {
@@ -152,7 +152,7 @@ public final class BasicArrayTypeInfo<T, C> extends 
TypeInformation<T> {
        // 
--------------------------------------------------------------------------------------------
 
        @SuppressWarnings("unchecked")
-       @Experimental
+       @PublicEvolving
        public static <X, C> BasicArrayTypeInfo<X, C> getInfoFor(Class<X> type) 
{
                if (!type.isArray()) {
                        throw new InvalidTypesException("The given class is no 
array.");

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index 0b6b067..4eb70c1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import java.util.Objects;
 
 import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -100,7 +100,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> 
implements AtomicType<T
         * Returns whether this type should be automatically casted to
         * the target type in an arithmetic operation.
         */
-       @Experimental
+       @PublicEvolving
        public boolean shouldAutocastTo(BasicTypeInfo<?> to) {
                for (Class<?> possibleTo: possibleCastTargetTypes) {
                        if (possibleTo.equals(to.getTypeClass())) {
@@ -111,49 +111,49 @@ public class BasicTypeInfo<T> extends TypeInformation<T> 
implements AtomicType<T
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isBasicType() {
                return true;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isTupleType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getArity() {
                return 1;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getTotalFields() {
                return 1;
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public Class<T> getTypeClass() {
                return this.clazz;
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isKeyType() {
                return true;
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                return this.serializer;
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
                if (comparatorClass != null) {
                        return instantiateComparator(comparatorClass, 
sortOrderAscending);
@@ -196,7 +196,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> 
implements AtomicType<T
        
        // 
--------------------------------------------------------------------------------------------
 
-       @Experimental
+       @PublicEvolving
        public static <X> BasicTypeInfo<X> getInfoFor(Class<X> type) {
                if (type == null) {
                        throw new NullPointerException();

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
index 1e60265..033a0e9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.common.typeinfo;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -33,43 +33,43 @@ public class NothingTypeInfo extends 
TypeInformation<Nothing> {
        private static final long serialVersionUID = 1L;
        
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isBasicType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isTupleType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getArity() {
                return 0;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getTotalFields() {
                return 0;
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public Class<Nothing> getTypeClass() {
                return Nothing.class;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isKeyType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public TypeSerializer<Nothing> createSerializer(ExecutionConfig 
executionConfig) {
                throw new RuntimeException("The Nothing type cannot have a 
serializer.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
index a4cf434..2c75458 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import java.util.Objects;
 
 import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -95,43 +95,43 @@ public class PrimitiveArrayTypeInfo<T> extends 
TypeInformation<T> implements Ato
        // 
--------------------------------------------------------------------------------------------
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isBasicType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isTupleType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getArity() {
                return 1;
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public int getTotalFields() {
                return 1;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public Class<T> getTypeClass() {
                return this.arrayClass;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isKeyType() {
                return true;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                return this.serializer;
        }
@@ -140,7 +140,7 @@ public class PrimitiveArrayTypeInfo<T> extends 
TypeInformation<T> implements Ato
         * Gets the class that represents the component type.
         * @return The class of the component type.
         */
-       @Experimental
+       @PublicEvolving
        public Class<?> getComponentClass() {
                return this.arrayClass.getComponentType();
        }
@@ -149,7 +149,7 @@ public class PrimitiveArrayTypeInfo<T> extends 
TypeInformation<T> implements Ato
         * Gets the type information of the component type.
         * @return The type information of the component type.
         */
-       @Experimental
+       @PublicEvolving
        public TypeInformation<?> getComponentType() {
                return BasicTypeInfo.getInfoFor(getComponentClass());
        }
@@ -195,7 +195,7 @@ public class PrimitiveArrayTypeInfo<T> extends 
TypeInformation<T> implements Ato
         * @throws InvalidTypesException Thrown, if the given class does not 
represent an array.
         */
        @SuppressWarnings("unchecked")
-       @Experimental
+       @PublicEvolving
        public static <X> PrimitiveArrayTypeInfo<X> getInfoFor(Class<X> type) {
                if (!type.isArray()) {
                        throw new InvalidTypesException("The given class is no 
array.");
@@ -221,7 +221,7 @@ public class PrimitiveArrayTypeInfo<T> extends 
TypeInformation<T> implements Ato
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public PrimitiveArrayComparator<T, ?> createComparator(boolean 
sortOrderAscending, ExecutionConfig executionConfig) {
                try {
                        return 
comparatorClass.getConstructor(boolean.class).newInstance(sortOrderAscending);

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index 8d96d34..1c95be0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.common.typeinfo;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -81,7 +81,7 @@ public abstract class TypeInformation<T> implements 
Serializable {
         *  
         * @return True, if this type information describes a basic type, false 
otherwise.
         */
-       @Experimental
+       @PublicEvolving
        public abstract boolean isBasicType();
        
        /**
@@ -90,7 +90,7 @@ public abstract class TypeInformation<T> implements 
Serializable {
         *  
         * @return True, if this type information describes a tuple type, false 
otherwise.
         */
-       @Experimental
+       @PublicEvolving
        public abstract boolean isTupleType();
        
        /**
@@ -98,7 +98,7 @@ public abstract class TypeInformation<T> implements 
Serializable {
         * 
         * @return Gets the number of fields in this type without nesting.
         */
-       @Experimental
+       @PublicEvolving
        public abstract int getArity();
        
        /**
@@ -109,7 +109,7 @@ public abstract class TypeInformation<T> implements 
Serializable {
         * 
         * @return The number of fields in this type, including its sub-fields 
(for composite types) 
         */
-       @Experimental
+       @PublicEvolving
        public abstract int getTotalFields();
        
        /**
@@ -117,7 +117,7 @@ public abstract class TypeInformation<T> implements 
Serializable {
         *  
         * @return The class of the type represented by this type information.
         */
-       @Experimental
+       @PublicEvolving
        public abstract Class<T> getTypeClass();
 
        /**
@@ -125,7 +125,7 @@ public abstract class TypeInformation<T> implements 
Serializable {
         *
         * @return The list of generic parameters. This list can be empty.
         */
-       @Experimental
+       @PublicEvolving
        public List<TypeInformation<?>> getGenericParameters() {
                // Return an empty list as the default implementation
                return new LinkedList<>();
@@ -137,14 +137,14 @@ public abstract class TypeInformation<T> implements 
Serializable {
         *  
         * @return True, if the type can be used as a key, false otherwise.
         */
-       @Experimental
+       @PublicEvolving
        public abstract boolean isKeyType();
 
        /**
         * Checks whether this type can be used as a key for sorting.
         * The order produced by sorting this type must be meaningful.
         */
-       @Experimental
+       @PublicEvolving
        public boolean isSortKeyType() {
                return isKeyType();
        }
@@ -156,7 +156,7 @@ public abstract class TypeInformation<T> implements 
Serializable {
         * @param config The config used to parameterize the serializer.
         * @return A serializer for this type.
         */
-       @Experimental
+       @PublicEvolving
        public abstract TypeSerializer<T> createSerializer(ExecutionConfig 
config);
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index 959750a..19b6eaf 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -23,7 +23,7 @@ import java.util.List;
 import java.util.Objects;
 
 import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -42,7 +42,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
        
        private final Class<T> typeClass;
 
-       @Experimental
+       @PublicEvolving
        public CompositeType(Class<T> typeClass) {
                this.typeClass = Preconditions.checkNotNull(typeClass);
        }
@@ -52,7 +52,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
         *
         * @return Type class of the composite type
         */
-       @Experimental
+       @PublicEvolving
        public Class<T> getTypeClass() {
                return typeClass;
        }
@@ -63,7 +63,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
         * @param fieldExpression The field expression for which the flat field 
descriptors are computed.
         * @return The list of descriptors for the flat fields which are 
specified by the field expression.
         */
-       @Experimental
+       @PublicEvolving
        public List<FlatFieldDescriptor> getFlatFields(String fieldExpression) {
                List<FlatFieldDescriptor> result = new 
ArrayList<FlatFieldDescriptor>();
                this.getFlatFields(fieldExpression, 0, result);
@@ -77,7 +77,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
         * @param offset The offset to use when computing the positions of the 
flat fields.
         * @param result The list into which all flat field descriptors are 
inserted.
         */
-       @Experimental
+       @PublicEvolving
        public abstract void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result);
 
        /**
@@ -87,7 +87,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
         * @param fieldExpression The field expression for which the field of 
which the type is returned.
         * @return The type of the field at the given field expression.
         */
-       @Experimental
+       @PublicEvolving
        public abstract <X> TypeInformation<X> getTypeAt(String 
fieldExpression);
 
        /**
@@ -96,10 +96,10 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
         * @param pos The position of the (unnested) field in this composite 
type.
         * @return The type of the field at the given position.
         */
-       @Experimental
+       @PublicEvolving
        public abstract <X> TypeInformation<X> getTypeAt(int pos);
 
-       @Experimental
+       @PublicEvolving
        protected abstract TypeComparatorBuilder<T> 
createTypeComparatorBuilder();
        
        /**
@@ -107,7 +107,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
         * to create the actual comparators
         * @return The comparator
         */
-       @Experimental
+       @PublicEvolving
        public TypeComparator<T> createComparator(int[] logicalKeyFields, 
boolean[] orders, int logicalFieldOffset, ExecutionConfig config) {
 
                TypeComparatorBuilder<T> builder = 
createTypeComparatorBuilder();
@@ -169,7 +169,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
 
        // 
--------------------------------------------------------------------------------------------
 
-       @Experimental
+       @PublicEvolving
        protected interface TypeComparatorBuilder<T> {
                void initializeTypeComparatorBuilder(int size);
 
@@ -178,7 +178,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
                TypeComparator<T> createTypeComparator(ExecutionConfig config);
        }
 
-       @Experimental
+       @PublicEvolving
        public static class FlatFieldDescriptor {
                private int keyPosition;
                private TypeInformation<?> type;
@@ -209,13 +209,13 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
        /**
         * Returns true when this type has a composite field with the given 
name.
         */
-       @Experimental
+       @PublicEvolving
        public boolean hasField(String fieldName) {
                return getFieldIndex(fieldName) >= 0;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isKeyType() {
                for(int i=0;i<this.getArity();i++) {
                        if (!this.getTypeAt(i).isKeyType()) {
@@ -226,7 +226,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isSortKeyType() {
                for(int i=0;i<this.getArity();i++) {
                        if (!this.getTypeAt(i).isSortKeyType()) {
@@ -240,7 +240,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
         * Returns the names of the composite fields of this type. The order of 
the returned array must
         * be consistent with the internal field index ordering.
         */
-       @Experimental
+       @PublicEvolving
        public abstract String[] getFieldNames();
 
        /**
@@ -252,7 +252,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
         * This is used when translating a DataSet or DataStream to an 
Expression Table, when
         * initially renaming the fields of the underlying type.
         */
-       @Experimental
+       @PublicEvolving
        public boolean hasDeterministicFieldOrder() {
                return false;
        }
@@ -262,10 +262,10 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
         *
         * @return The field index or -1 if this type does not have a field of 
the given name.
         */
-       @Experimental
+       @PublicEvolving
        public abstract int getFieldIndex(String fieldName);
 
-       @Experimental
+       @PublicEvolving
        public static class InvalidFieldReferenceException extends 
IllegalArgumentException {
 
                private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
index 3a93dfb..0132eff 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
@@ -20,7 +20,7 @@
 package org.apache.flink.api.java.typeutils;
 
 import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
@@ -43,7 +43,7 @@ import java.util.List;
  */
 @Public
 public class AvroTypeInfo<T extends SpecificRecordBase> extends 
PojoTypeInfo<T> {
-       @Experimental
+       @PublicEvolving
        public AvroTypeInfo(Class<T> typeClass) {
                super(typeClass, generateFieldsFromAvroSchema(typeClass));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
index fdd101c..058de12 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -41,51 +41,51 @@ public class EitherTypeInfo<L, R> extends 
TypeInformation<Either<L, R>> {
 
        private final TypeInformation<R> rightType;
 
-       @Experimental
+       @PublicEvolving
        public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> 
rightType) {
                this.leftType = leftType;
                this.rightType = rightType;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isBasicType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isTupleType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getArity() {
                return 1;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getTotalFields() {
                return 1;
        }
 
        @SuppressWarnings("unchecked")
        @Override
-       @Experimental
+       @PublicEvolving
        public Class<Either<L, R>> getTypeClass() {
                return (Class<Either<L, R>>) (Class<?>) Either.class;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isKeyType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public TypeSerializer<Either<L, R>> createSerializer(ExecutionConfig 
config) {
                return new EitherSerializer<L, 
R>(leftType.createSerializer(config),
                                rightType.createSerializer(config));

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
index eb8dd6d..aec3c1d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java.typeutils;
 
 import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -41,7 +41,7 @@ public class EnumTypeInfo<T extends Enum<T>> extends 
TypeInformation<T> implemen
        
        private final Class<T> typeClass;
 
-       @Experimental
+       @PublicEvolving
        public EnumTypeInfo(Class<T> typeClass) {
                Preconditions.checkNotNull(typeClass, "Enum type class must not 
be null.");
 
@@ -53,49 +53,49 @@ public class EnumTypeInfo<T extends Enum<T>> extends 
TypeInformation<T> implemen
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
                return new EnumComparator<T>(sortOrderAscending);
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isBasicType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isTupleType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getArity() {
                return 1;
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public int getTotalFields() {
                return 1;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public Class<T> getTypeClass() {
                return this.typeClass;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isKeyType() {
                return true;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                return new EnumSerializer<T>(typeClass);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 7e66928..0cca8bd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java.typeutils;
 
 import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -37,56 +37,56 @@ public class GenericTypeInfo<T> extends TypeInformation<T> 
implements AtomicType
        
        private final Class<T> typeClass;
 
-       @Experimental
+       @PublicEvolving
        public GenericTypeInfo(Class<T> typeClass) {
                this.typeClass = Preconditions.checkNotNull(typeClass);
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isBasicType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isTupleType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getArity() {
                return 1;
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public int getTotalFields() {
                return 1;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public Class<T> getTypeClass() {
                return typeClass;
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isKeyType() {
                return Comparable.class.isAssignableFrom(typeClass);
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public TypeSerializer<T> createSerializer(ExecutionConfig config) {
                return new KryoSerializer<T>(this.typeClass, config);
        }
 
        @SuppressWarnings("unchecked")
        @Override
-       @Experimental
+       @PublicEvolving
        public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
                if (isKeyType()) {
                        @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
index 024fe59..1e8fbe2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.typeutils;
 
 import java.lang.reflect.Array;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -45,50 +45,50 @@ public class ObjectArrayTypeInfo<T, C> extends 
TypeInformation<T> {
        // 
--------------------------------------------------------------------------------------------
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isBasicType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isTupleType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getArity() {
                return 1;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getTotalFields() {
                return 1;
        }
 
        @SuppressWarnings("unchecked")
        @Override
-       @Experimental
+       @PublicEvolving
        public Class<T> getTypeClass() {
                return arrayType;
        }
 
-       @Experimental
+       @PublicEvolving
        public TypeInformation<C> getComponentInfo() {
                return componentInfo;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isKeyType() {
                return false;
        }
 
        @SuppressWarnings("unchecked")
        @Override
-       @Experimental
+       @PublicEvolving
        public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                return (TypeSerializer<T>) new GenericArraySerializer<C>(
                        componentInfo.getTypeClass(),
@@ -126,7 +126,7 @@ public class ObjectArrayTypeInfo<T, C> extends 
TypeInformation<T> {
 
        // 
--------------------------------------------------------------------------------------------
 
-       @Experimental
+       @PublicEvolving
        public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> 
arrayClass, TypeInformation<C> componentInfo) {
                Preconditions.checkNotNull(arrayClass);
                Preconditions.checkNotNull(componentInfo);
@@ -144,7 +144,7 @@ public class ObjectArrayTypeInfo<T, C> extends 
TypeInformation<T> {
         * {@link java.lang.reflect.Type} or {@link java.lang.Class}.
         */
        @SuppressWarnings("unchecked")
-       @Experimental
+       @PublicEvolving
        public static <T, C> ObjectArrayTypeInfo<T, C> 
getInfoFor(TypeInformation<C> componentInfo) {
                Preconditions.checkNotNull(componentInfo);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 39d3bcd..be2a027 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -29,7 +29,7 @@ import java.util.regex.Pattern;
 
 import com.google.common.base.Preconditions;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
@@ -74,7 +74,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        
        private final int totalFields;
 
-       @Experimental
+       @PublicEvolving
        public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) {
                super(typeClass);
 
@@ -100,32 +100,32 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isBasicType() {
                return false;
        }
 
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isTupleType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getArity() {
                return fields.length;
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public int getTotalFields() {
                return totalFields;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isSortKeyType() {
                // Support for sorting POJOs that implement Comparable is not 
implemented yet.
                // Since the order of fields in a POJO type is not well 
defined, sorting on fields
@@ -135,7 +135,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        
 
        @Override
-       @Experimental
+       @PublicEvolving
        public void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
 
                Matcher matcher = 
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
@@ -212,7 +212,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 
        @SuppressWarnings("unchecked")
        @Override
-       @Experimental
+       @PublicEvolving
        public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
 
                Matcher matcher = 
PATTERN_NESTED_FIELDS.matcher(fieldExpression);
@@ -253,7 +253,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public <X> TypeInformation<X> getTypeAt(int pos) {
                if (pos < 0 || pos >= this.fields.length) {
                        throw new IndexOutOfBoundsException();
@@ -269,7 +269,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        }
 
        // used for testing. Maybe use mockito here
-       @Experimental
+       @PublicEvolving
        public PojoField getPojoFieldAt(int pos) {
                if (pos < 0 || pos >= this.fields.length) {
                        throw new IndexOutOfBoundsException();
@@ -277,7 +277,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
                return this.fields[pos];
        }
 
-       @Experimental
+       @PublicEvolving
        public String[] getFieldNames() {
                String[] result = new String[fields.length];
                for (int i = 0; i < fields.length; i++) {
@@ -287,7 +287,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getFieldIndex(String fieldName) {
                for (int i = 0; i < fields.length; i++) {
                        if (fields[i].getField().getName().equals(fieldName)) {
@@ -298,7 +298,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public TypeSerializer<T> createSerializer(ExecutionConfig config) {
                if(config.isForceKryoEnabled()) {
                        return new KryoSerializer<T>(getTypeClass(), config);

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 4d1927d..9ecbe73 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -24,7 +24,7 @@ import java.util.Collections;
 
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Ints;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -53,12 +53,12 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
        protected final String[] fieldNames;
 
        @SuppressWarnings("unchecked")
-       @Experimental
+       @PublicEvolving
        public TupleTypeInfo(TypeInformation<?>... types) {
                this((Class<T>) Tuple.getTupleClass(types.length), types);
        }
 
-       @Experimental
+       @PublicEvolving
        public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) {
                super(tupleType, types);
 
@@ -74,13 +74,13 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public String[] getFieldNames() {
                return fieldNames;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getFieldIndex(String fieldName) {
                int fieldIndex = Integer.parseInt(fieldName.substring(1));
                if (fieldIndex >= getArity()) {
@@ -91,7 +91,7 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
 
        @SuppressWarnings("unchecked")
        @Override
-       @Experimental
+       @PublicEvolving
        public TupleSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                if (getTypeClass() == Tuple0.class) {
                        return (TupleSerializer<T>) Tuple0Serializer.INSTANCE;
@@ -199,7 +199,7 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
 
        // 
--------------------------------------------------------------------------------------------
 
-       @Experimental
+       @PublicEvolving
        public static <X extends Tuple> TupleTypeInfo<X> 
getBasicTupleTypeInfo(Class<?>... basicTypes) {
                if (basicTypes == null || basicTypes.length == 0) {
                        throw new IllegalArgumentException();
@@ -225,7 +225,7 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
        }
 
        @SuppressWarnings("unchecked")
-       @Experimental
+       @PublicEvolving
        public static <X extends Tuple> TupleTypeInfo<X> 
getBasicAndBasicValueTupleTypeInfo(Class<?>... basicTypes) {
                if (basicTypes == null || basicTypes.length == 0) {
                        throw new IllegalArgumentException();

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index de3503b..01afe14 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -32,7 +32,7 @@ import java.util.List;
 import org.apache.avro.specific.SpecificRecordBase;
 
 import org.apache.commons.lang3.ClassUtils;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -108,12 +108,12 @@ public class TypeExtractor {
        //  Function specific methods
        // 
--------------------------------------------------------------------------------------------
 
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> 
inType) {
                return getMapReturnTypes(mapInterface, inType, null, false);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType,
                        String functionName, boolean allowMissing)
        {
@@ -121,74 +121,74 @@ public class TypeExtractor {
        }
        
 
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, 
TypeInformation<IN> inType) {
                return getFlatMapReturnTypes(flatMapInterface, inType, null, 
false);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, 
TypeInformation<IN> inType,
                        String functionName, boolean allowMissing)
        {
                return getUnaryOperatorReturnType((Function) flatMapInterface, 
FlatMapFunction.class, false, true, inType, functionName, allowMissing);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> 
inType)
        {
                return getFoldReturnTypes(foldInterface, inType, null, false);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> 
inType, String functionName, boolean allowMissing)
        {
                return getUnaryOperatorReturnType((Function) foldInterface, 
FoldFunction.class, false, false, inType, functionName, allowMissing);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, 
TypeInformation<IN> inType) {
                return getMapPartitionReturnTypes(mapPartitionInterface, 
inType, null, false);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, 
TypeInformation<IN> inType,
                        String functionName, boolean allowMissing)
        {
                return getUnaryOperatorReturnType((Function) 
mapPartitionInterface, MapPartitionFunction.class, true, true, inType, 
functionName, allowMissing);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, 
TypeInformation<IN> inType) {
                return getGroupReduceReturnTypes(groupReduceInterface, inType, 
null, false);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, 
TypeInformation<IN> inType,
                        String functionName, boolean allowMissing)
        {
                return getUnaryOperatorReturnType((Function) 
groupReduceInterface, GroupReduceFunction.class, true, true, inType, 
functionName, allowMissing);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, 
TypeInformation<IN> inType) {
                return getGroupCombineReturnTypes(combineInterface, inType, 
null, false);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, 
TypeInformation<IN> inType,
                                                                                
                                                                        String 
functionName, boolean allowMissing)
        {
                return getUnaryOperatorReturnType((Function) combineInterface, 
GroupCombineFunction.class, true, true, inType, functionName, allowMissing);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type)
        {
                return getFlatJoinReturnTypes(joinInterface, in1Type, in2Type, 
null, false);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type, String functionName, boolean allowMissing)
        {
@@ -196,14 +196,14 @@ public class TypeExtractor {
                                in1Type, in2Type, functionName, allowMissing);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type)
        {
                return getJoinReturnTypes(joinInterface, in1Type, in2Type, 
null, false);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type, String functionName, boolean allowMissing)
        {
@@ -211,14 +211,14 @@ public class TypeExtractor {
                                in1Type, in2Type, functionName, allowMissing);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type)
        {
                return getCoGroupReturnTypes(coGroupInterface, in1Type, 
in2Type, null, false);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type, String functionName, boolean allowMissing)
        {
@@ -226,14 +226,14 @@ public class TypeExtractor {
                                in1Type, in2Type, functionName, allowMissing);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type)
        {
                return getCrossReturnTypes(crossInterface, in1Type, in2Type, 
null, false);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type, String functionName, boolean allowMissing)
        {
@@ -241,31 +241,31 @@ public class TypeExtractor {
                                in1Type, in2Type, functionName, allowMissing);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface, TypeInformation<IN> 
inType) {
                return getKeySelectorTypes(selectorInterface, inType, null, 
false);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface,
                        TypeInformation<IN> inType, String functionName, 
boolean allowMissing)
        {
                return getUnaryOperatorReturnType((Function) selectorInterface, 
KeySelector.class, false, false, inType, functionName, allowMissing);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> 
partitioner) {
                return getPartitionerTypes(partitioner, null, false);
        }
 
-       @Experimental
+       @PublicEvolving
        public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> 
partitioner, String functionName, boolean allowMissing) {
                return new 
TypeExtractor().privateCreateTypeInfo(Partitioner.class, 
partitioner.getClass(), 0, null, null);
        }
        
        
        @SuppressWarnings("unchecked")
-       @Experimental
+       @PublicEvolving
        public static <IN> TypeInformation<IN> 
getInputFormatTypes(InputFormat<IN, ?> inputFormatInterface) {
                if (inputFormatInterface instanceof ResultTypeQueryable) {
                        return ((ResultTypeQueryable<IN>) 
inputFormatInterface).getProducedType();
@@ -278,7 +278,7 @@ public class TypeExtractor {
        // 
--------------------------------------------------------------------------------------------
        
        @SuppressWarnings("unchecked")
-       @Experimental
+       @PublicEvolving
        public static <IN, OUT> TypeInformation<OUT> 
getUnaryOperatorReturnType(Function function, Class<?> baseClass, 
                        boolean hasIterable, boolean hasCollector, 
TypeInformation<IN> inType,
                        String functionName, boolean allowMissing)
@@ -316,7 +316,7 @@ public class TypeExtractor {
        }
        
        @SuppressWarnings("unchecked")
-       @Experimental
+       @PublicEvolving
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
getBinaryOperatorReturnType(Function function, Class<?> baseClass,
                        boolean hasIterables, boolean hasCollector, 
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type,
                        String functionName, boolean allowMissing)
@@ -388,7 +388,7 @@ public class TypeExtractor {
         * @return type information
         */
        @SuppressWarnings("unchecked")
-       @Experimental
+       @PublicEvolving
        public static <OUT> TypeInformation<OUT> createTypeInfo(Object 
instance, Class<?> baseClass, Class<?> clazz, int returnParamPos) {
                if (instance instanceof ResultTypeQueryable) {
                        return ((ResultTypeQueryable<OUT>) 
instance).getProducedType();
@@ -397,7 +397,7 @@ public class TypeExtractor {
                }
        }
 
-       @Experimental
+       @PublicEvolving
        public static <IN1, IN2, OUT> TypeInformation<OUT> 
createTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type) {
                TypeInformation<OUT> ti =  new 
TypeExtractor().privateCreateTypeInfo(baseClass, clazz, returnParamPos, 
in1Type, in2Type);
@@ -790,7 +790,7 @@ public class TypeExtractor {
        //  Extract type parameters
        // 
--------------------------------------------------------------------------------------------
 
-       @Experimental
+       @PublicEvolving
        public static Type getParameterType(Class<?> baseClass, Class<?> clazz, 
int pos) {
                return getParameterType(baseClass, null, clazz, pos);
        }
@@ -1581,7 +1581,7 @@ public class TypeExtractor {
         * This is required because class.getFields() is not returning fields 
defined
         * in parent classes.
         */
-       @Experimental
+       @PublicEvolving
        public static List<Field> getAllDeclaredFields(Class<?> clazz) {
                List<Field> result = new ArrayList<Field>();
                while (clazz != null) {
@@ -1601,7 +1601,7 @@ public class TypeExtractor {
                return result;
        }
 
-       @Experimental
+       @PublicEvolving
        public static Field getDeclaredField(Class<?> clazz, String name) {
                for (Field field : getAllDeclaredFields(clazz)) {
                        if (field.getName().equals(name)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index 9f30716..7c173c0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java.typeutils;
 
 import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -68,7 +68,7 @@ public class ValueTypeInfo<T extends Value> extends 
TypeInformation<T> implement
 
        private final Class<T> type;
 
-       @Experimental
+       @PublicEvolving
        public ValueTypeInfo(Class<T> type) {
                this.type = Preconditions.checkNotNull(type);
 
@@ -78,30 +78,30 @@ public class ValueTypeInfo<T extends Value> extends 
TypeInformation<T> implement
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public int getArity() {
                return 1;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getTotalFields() {
                return 1;
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public Class<T> getTypeClass() {
                return this.type;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isBasicType() {
                return false;
        }
 
-       @Experimental
+       @PublicEvolving
        public boolean isBasicValueType() {
                return type.equals(StringValue.class) || 
type.equals(ByteValue.class) || type.equals(ShortValue.class) || 
type.equals(CharValue.class) ||
                                type.equals(DoubleValue.class) || 
type.equals(FloatValue.class) || type.equals(IntValue.class) || 
type.equals(LongValue.class) ||
@@ -109,20 +109,20 @@ public class ValueTypeInfo<T extends Value> extends 
TypeInformation<T> implement
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isTupleType() {
                return false;
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isKeyType() {
                return Comparable.class.isAssignableFrom(type);
        }
 
        @Override
        @SuppressWarnings("unchecked")
-       @Experimental
+       @PublicEvolving
        public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                if (CopyableValue.class.isAssignableFrom(type)) {
                        return (TypeSerializer<T>) 
createCopyableValueSerializer(type.asSubclass(CopyableValue.class));
@@ -134,7 +134,7 @@ public class ValueTypeInfo<T extends Value> extends 
TypeInformation<T> implement
        
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Override
-       @Experimental
+       @PublicEvolving
        public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
                if (!isKeyType()) {
                        throw new RuntimeException("The type " + type.getName() 
+ " is not Comparable.");
@@ -185,7 +185,7 @@ public class ValueTypeInfo<T extends Value> extends 
TypeInformation<T> implement
        
        // 
--------------------------------------------------------------------------------------------
 
-       @Experimental
+       @PublicEvolving
        static <X extends Value> TypeInformation<X> getValueTypeInfo(Class<X> 
typeClass) {
                if (Value.class.isAssignableFrom(typeClass) && 
!typeClass.equals(Value.class)) {
                        return new ValueTypeInfo<X>(typeClass);

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
index 3899f18..5e3b2bc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java.typeutils;
 
 import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -44,7 +44,7 @@ public class WritableTypeInfo<T extends Writable> extends 
TypeInformation<T> imp
        
        private final Class<T> typeClass;
 
-       @Experimental
+       @PublicEvolving
        public WritableTypeInfo(Class<T> typeClass) {
                this.typeClass = Preconditions.checkNotNull(typeClass);
 
@@ -55,7 +55,7 @@ public class WritableTypeInfo<T extends Writable> extends 
TypeInformation<T> imp
 
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @Override
-       @Experimental
+       @PublicEvolving
        public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
                if(Comparable.class.isAssignableFrom(typeClass)) {
                        return new WritableComparator(sortOrderAscending, 
typeClass);
@@ -67,43 +67,43 @@ public class WritableTypeInfo<T extends Writable> extends 
TypeInformation<T> imp
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isBasicType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isTupleType() {
                return false;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public int getArity() {
                return 1;
        }
        
        @Override
-       @Experimental
+       @PublicEvolving
        public int getTotalFields() {
                return 1;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public Class<T> getTypeClass() {
                return this.typeClass;
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public boolean isKeyType() {
                return Comparable.class.isAssignableFrom(typeClass);
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                return new WritableSerializer<T>(typeClass);
        }
@@ -139,7 +139,7 @@ public class WritableTypeInfo<T extends Writable> extends 
TypeInformation<T> imp
        
        // 
--------------------------------------------------------------------------------------------
 
-       @Experimental
+       @PublicEvolving
        static <T extends Writable> TypeInformation<T> 
getWritableTypeInfo(Class<T> typeClass) {
                if (Writable.class.isAssignableFrom(typeClass) && 
!typeClass.equals(Writable.class)) {
                        return new WritableTypeInfo<T>(typeClass);

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index e205bef..bfb97f4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java;
 
 import com.google.common.base.Preconditions;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -1642,7 +1642,7 @@ public abstract class DataSet<T> {
         * @deprecated Use {@link #printOnTaskManager(String)} instead.
         */
        @Deprecated
-       @Experimental
+       @PublicEvolving
        public DataSink<T> print(String sinkIdentifier) {
                return output(new PrintingOutputFormat<T>(sinkIdentifier, 
false));
        }
@@ -1659,7 +1659,7 @@ public abstract class DataSet<T> {
         *             {@link PrintingOutputFormat} instead.
         */
        @Deprecated
-       @Experimental
+       @PublicEvolving
        public DataSink<T> printToErr(String sinkIdentifier) {
                return output(new PrintingOutputFormat<T>(sinkIdentifier, 
true));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 7fc45b3..512fe42 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -23,7 +23,7 @@ import com.esotericsoftware.kryo.Serializer;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -188,7 +188,7 @@ public abstract class ExecutionEnvironment {
         *
         * @param numberOfExecutionRetries The number of times the system will 
try to re-execute failed tasks.
         */
-       @Experimental
+       @PublicEvolving
        public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
                config.setNumberOfExecutionRetries(numberOfExecutionRetries);
        }
@@ -200,7 +200,7 @@ public abstract class ExecutionEnvironment {
         *
         * @return The number of times the system will try to re-execute failed 
tasks.
         */
-       @Experimental
+       @PublicEvolving
        public int getNumberOfExecutionRetries() {
                return config.getNumberOfExecutionRetries();
        }
@@ -225,7 +225,7 @@ public abstract class ExecutionEnvironment {
         * @return The JobID of this environment.
         * @see #getIdString()
         */
-       @Experimental
+       @PublicEvolving
        public JobID getId() {
                return this.jobID;
        }
@@ -236,7 +236,7 @@ public abstract class ExecutionEnvironment {
         * @return The JobID as a string.
         * @see #getId()
         */
-       @Experimental
+       @PublicEvolving
        public String getIdString() {
                return this.jobID.toString();
        }
@@ -247,7 +247,7 @@ public abstract class ExecutionEnvironment {
         *
         * @param timeout The timeout, in seconds.
         */
-       @Experimental
+       @PublicEvolving
        public void setSessionTimeout(long timeout) {
                throw new IllegalStateException("Support for sessions is 
currently disabled. " +
                                "It will be enabled in future Flink versions.");
@@ -265,7 +265,7 @@ public abstract class ExecutionEnvironment {
         *
         * @return The session timeout, in seconds.
         */
-       @Experimental
+       @PublicEvolving
        public long getSessionTimeout() {
                return sessionTimeout;
        }
@@ -273,7 +273,7 @@ public abstract class ExecutionEnvironment {
        /**
         * Starts a new session, discarding the previous data flow and all of 
its intermediate results.
         */
-       @Experimental
+       @PublicEvolving
        public abstract void startNewSession() throws Exception;
 
        // 
--------------------------------------------------------------------------------------------
@@ -558,7 +558,7 @@ public abstract class ExecutionEnvironment {
         * Creates a {@link DataSet} from the given {@link 
org.apache.hadoop.mapred.FileInputFormat}. The
         * given inputName is set on the given job.
         */
-       @Experimental
+       @PublicEvolving
        public <K,V> DataSource<Tuple2<K, V>> 
readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, 
Class<K> key, Class<V> value, String inputPath, JobConf job) {
                DataSource<Tuple2<K, V>> result = 
createHadoopInput(mapredInputFormat, key, value, job);
 
@@ -571,7 +571,7 @@ public abstract class ExecutionEnvironment {
         * Creates a {@link DataSet} from {@link 
org.apache.hadoop.mapred.SequenceFileInputFormat}
         * A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath 
is created.
         */
-       @Experimental
+       @PublicEvolving
        public <K,V> DataSource<Tuple2<K, V>> readSequenceFile(Class<K> key, 
Class<V> value, String inputPath) throws IOException {
                return readHadoopFile(new 
org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, 
inputPath);
        }
@@ -580,7 +580,7 @@ public abstract class ExecutionEnvironment {
         * Creates a {@link DataSet} from the given {@link 
org.apache.hadoop.mapred.FileInputFormat}. A
         * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is 
created.
         */
-       @Experimental
+       @PublicEvolving
        public <K,V> DataSource<Tuple2<K, V>> 
readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, 
Class<K> key, Class<V> value, String inputPath) {
                return readHadoopFile(mapredInputFormat, key, value, inputPath, 
new JobConf());
        }
@@ -588,7 +588,7 @@ public abstract class ExecutionEnvironment {
        /**
         * Creates a {@link DataSet} from the given {@link 
org.apache.hadoop.mapred.InputFormat}.
         */
-       @Experimental
+       @PublicEvolving
        public <K,V> DataSource<Tuple2<K, V>> 
createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, 
Class<K> key, Class<V> value, JobConf job) {
                HadoopInputFormat<K, V> hadoopInputFormat = new 
HadoopInputFormat<>(mapredInputFormat, key, value, job);
 
@@ -599,7 +599,7 @@ public abstract class ExecutionEnvironment {
         * Creates a {@link DataSet} from the given {@link 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The
         * given inputName is set on the given job.
         */
-       @Experimental
+       @PublicEvolving
        public <K,V> DataSource<Tuple2<K, V>> 
readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> 
mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) 
throws IOException {
                DataSource<Tuple2<K, V>> result = 
createHadoopInput(mapreduceInputFormat, key, value, job);
 
@@ -613,7 +613,7 @@ public abstract class ExecutionEnvironment {
         * Creates a {@link DataSet} from the given {@link 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A
         * {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is 
created.
         */
-       @Experimental
+       @PublicEvolving
        public <K,V> DataSource<Tuple2<K, V>> 
readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> 
mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws 
IOException {
                return readHadoopFile(mapreduceInputFormat, key, value, 
inputPath, Job.getInstance());
        }
@@ -621,7 +621,7 @@ public abstract class ExecutionEnvironment {
        /**
         * Creates a {@link DataSet} from the given {@link 
org.apache.hadoop.mapreduce.InputFormat}.
         */
-       @Experimental
+       @PublicEvolving
        public <K,V> DataSource<Tuple2<K, V>> 
createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> 
mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
                org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, 
V> hadoopInputFormat = new 
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat,
 key, value, job);
 
@@ -1072,7 +1072,7 @@ public abstract class ExecutionEnvironment {
         * memory. parallelism will always be 1. This is useful during 
implementation and for debugging.
         * @return A Collection Environment
         */
-       @Experimental
+       @PublicEvolving
        public static CollectionEnvironment createCollectionsEnvironment(){
                CollectionEnvironment ce = new CollectionEnvironment();
                ce.setParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 8c7d6b8..d1fe298 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -109,7 +109,7 @@ public class LocalEnvironment extends ExecutionEnvironment {
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public void startNewSession() throws Exception {
                if (executor != null) {
                        // we need to end the previous session

Reply via email to