Repository: flink
Updated Branches:
  refs/heads/master 4a3151681 -> fa88d9eb1


[FLINK-2862] [Storm Compatibility] FlinkTopologyBuilder should use proper 
generic types

This closes #1274


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa88d9eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa88d9eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa88d9eb

Branch: refs/heads/master
Commit: fa88d9eb1b88641d6ca03b38e2058d30971c7a2f
Parents: 4a31516
Author: mjsax <[email protected]>
Authored: Tue Oct 20 18:06:00 2015 +0200
Committer: mjsax <[email protected]>
Committed: Sat Oct 24 13:04:10 2015 +0200

----------------------------------------------------------------------
 docs/apis/storm_compatibility.md                |   8 +-
 .../storm/api/FlinkOutputFieldsDeclarer.java    |   6 +-
 .../flink/storm/api/FlinkTopologyBuilder.java   | 107 +++++++++++--------
 .../storm/util/SplitStreamTypeKeySelector.java  |  47 --------
 .../flink/storm/wrappers/BoltWrapper.java       |  11 +-
 .../api/FlinkOutputFieldsDeclarerTest.java      |  12 +--
 6 files changed, 71 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index bf80d4e..e3e11ab 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -209,7 +209,6 @@ If a whole topology is executed in Flink using 
`FlinkTopologyBuilder` etc., ther
 For embedded usage, the output stream will be of data type 
`SplitStreamType<T>` and must be split by using `DataStream.split(...)` and 
`SplitStream.select(...)`.
 Flink provides the predefined output selector `StormStreamSelector<T>` for 
`.split(...)` already.
 Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using 
`SplitStreamMapper<T>`.
-If a data stream of type `SplitStreamTuple<T>` is used as input for a Bolt, it 
is **not** required to strip the wrapper &ndash; `BoltWrapper` removes it 
automatically.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -221,10 +220,9 @@ DataStream<SplitStreamType<SomeType>> multiStream = ...
 
 SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new 
StormStreamSelector<SomeType>());
 
-// remove SplitStreamMapper to get data stream of type SomeType
-DataStream<SomeType> s1 = splitStream.select("s1").map(new 
SplitStreamMapper<SomeType>).returns(SomeType.classs);
-// apply Bolt directly, without stripping SplitStreamType
-DataStream<BoltOutputType> s2 = splitStream.select("s2").transform(/* use Bolt 
for further processing */);
+// remove SplitStreamType using SplitStreamMapper to get data stream of type 
SomeType
+DataStream<SomeType> s1 = splitStream.select("s1").map(new 
SplitStreamMapper<SomeType>()).returns(SomeType.classs);
+DataStream<SomeType> s2 = splitStream.select("s2").map(new 
SplitStreamMapper<SomeType>()).returns(SomeType.classs);
 
 // do further processing on s1 and s2
 [...]

http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
index 88d2dfe..febd56d 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
@@ -91,7 +91,7 @@ final class FlinkOutputFieldsDeclarer implements 
OutputFieldsDeclarer {
         * @throws IllegalArgumentException
         *             If no output schema was declared for the specified 
stream or if more then 25 attributes got declared.
         */
-       TypeInformation<?> getOutputType(final String streamId) throws 
IllegalArgumentException {
+       TypeInformation<Tuple> getOutputType(final String streamId) throws 
IllegalArgumentException {
                if (streamId == null) {
                        return null;
                }
@@ -105,9 +105,7 @@ final class FlinkOutputFieldsDeclarer implements 
OutputFieldsDeclarer {
                Tuple t;
                final int numberOfAttributes = outputSchema.size();
 
-               if (numberOfAttributes == 1) {
-                       return TypeExtractor.getForClass(Object.class);
-               } else if (numberOfAttributes <= 25) {
+               if (numberOfAttributes <= 25) {
                        try {
                                t = 
Tuple.getTupleClass(numberOfAttributes).newInstance();
                        } catch (final InstantiationException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
index 9c41d88..8a88eac 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
@@ -33,9 +33,10 @@ import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamMapper;
 import org.apache.flink.storm.util.SplitStreamType;
-import org.apache.flink.storm.util.SplitStreamTypeKeySelector;
 import org.apache.flink.storm.util.StormStreamSelector;
 import org.apache.flink.storm.wrappers.BoltWrapper;
 import org.apache.flink.storm.wrappers.SpoutWrapper;
@@ -77,14 +78,13 @@ public class FlinkTopologyBuilder {
        /**
         * Creates a Flink program that uses the specified spouts and bolts.
         */
-       @SuppressWarnings({"rawtypes", "unchecked"})
        public FlinkTopology createTopology() {
                this.stormTopology = this.stormBuilder.createTopology();
 
                final FlinkTopology env = new FlinkTopology();
                env.setParallelism(1);
 
-               final HashMap<String, HashMap<String, DataStream>> 
availableInputs = new HashMap<String, HashMap<String, DataStream>>();
+               final HashMap<String, HashMap<String, DataStream<Tuple>>> 
availableInputs = new HashMap<String, HashMap<String, DataStream<Tuple>>>();
 
                for (final Entry<String, IRichSpout> spout : 
this.spouts.entrySet()) {
                        final String spoutId = spout.getKey();
@@ -96,24 +96,37 @@ public class FlinkTopologyBuilder {
                        this.outputStreams.put(spoutId, sourceStreams);
                        declarers.put(spoutId, declarer);
 
-                       final SpoutWrapper spoutWrapper = new 
SpoutWrapper(userSpout);
-                       spoutWrapper.setStormTopology(stormTopology);
 
-                       DataStreamSource source;
-                       final HashMap<String, DataStream> outputStreams = new 
HashMap<String, DataStream>();
+                       final HashMap<String, DataStream<Tuple>> outputStreams 
= new HashMap<String, DataStream<Tuple>>();
+                       final DataStreamSource<?> source;
+
                        if (sourceStreams.size() == 1) {
+                               final SpoutWrapper<Tuple> 
spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout);
+                               
spoutWrapperSingleOutput.setStormTopology(stormTopology);
+
                                final String outputStreamId = (String) 
sourceStreams.keySet().toArray()[0];
-                               source = env.addSource(spoutWrapper, spoutId,
+
+                               DataStreamSource<Tuple> src = 
env.addSource(spoutWrapperSingleOutput, spoutId,
                                                
declarer.getOutputType(outputStreamId));
-                               outputStreams.put(outputStreamId, source);
+
+                               outputStreams.put(outputStreamId, src);
+                               source = src;
                        } else {
-                               source = env.addSource(spoutWrapper, spoutId,
-                                               
TypeExtractor.getForClass(SplitStreamType.class));
-                               SplitStream splitSource = source.split(new 
StormStreamSelector());
+                               final SpoutWrapper<SplitStreamType<Tuple>> 
spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
+                                               userSpout);
+                               
spoutWrapperMultipleOutputs.setStormTopology(stormTopology);
+
+                               @SuppressWarnings({ "unchecked", "rawtypes" })
+                               DataStreamSource<SplitStreamType<Tuple>> 
multiSource = env.addSource(
+                                               spoutWrapperMultipleOutputs, 
spoutId,
+                                               (TypeInformation) 
TypeExtractor.getForClass(SplitStreamType.class));
 
+                               SplitStream<SplitStreamType<Tuple>> splitSource 
= multiSource
+                                               .split(new 
StormStreamSelector<Tuple>());
                                for (String streamId : sourceStreams.keySet()) {
-                                       outputStreams.put(streamId, 
splitSource.select(streamId));
+                                       outputStreams.put(streamId, 
splitSource.select(streamId).map(new SplitStreamMapper<Tuple>()));
                                }
+                               source = multiSource;
                        }
                        availableInputs.put(spoutId, outputStreams);
 
@@ -171,11 +184,11 @@ public class FlinkTopologyBuilder {
                                        final String producerId = 
stormInputStream.getKey().get_componentId();
                                        final String inputStreamId = 
stormInputStream.getKey().get_streamId();
 
-                                       final HashMap<String, DataStream> 
producer = availableInputs.get(producerId);
+                                       final HashMap<String, 
DataStream<Tuple>> producer = availableInputs.get(producerId);
                                        if (producer != null) {
                                                makeProgress = true;
 
-                                               DataStream inputStream = 
producer.get(inputStreamId);
+                                               DataStream<Tuple> inputStream = 
producer.get(inputStreamId);
                                                if (inputStream != null) {
                                                        final 
FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
                                                        
userBolt.declareOutputFields(declarer);
@@ -193,18 +206,9 @@ public class FlinkTopologyBuilder {
                                                                final 
List<String> fields = grouping.get_fields();
                                                                if 
(fields.size() > 0) {
                                                                        
FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
-                                                                       if 
(producer.size() == 1) {
-                                                                               
inputStream = inputStream.keyBy(prodDeclarer
-                                                                               
                .getGroupingFieldIndexes(inputStreamId,
-                                                                               
                                grouping.get_fields()));
-                                                                       } else {
-                                                                               
inputStream = inputStream
-                                                                               
                .keyBy(new SplitStreamTypeKeySelector(
-                                                                               
                                inputStream.getType(),
-                                                                               
                                prodDeclarer.getGroupingFieldIndexes(
-                                                                               
                                                inputStreamId,
-                                                                               
                                                grouping.get_fields())));
-                                                                       }
+                                                                       
inputStream = inputStream.keyBy(prodDeclarer
+                                                                               
        .getGroupingFieldIndexes(inputStreamId,
+                                                                               
                        grouping.get_fields()));
                                                                } else {
                                                                        
inputStream = inputStream.global();
                                                                }
@@ -215,43 +219,56 @@ public class FlinkTopologyBuilder {
                                                                                
"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
                                                        }
 
-                                                       final 
SingleOutputStreamOperator outputStream;
-                                                       final BoltWrapper 
boltWrapper;
+                                                       final 
SingleOutputStreamOperator<?, ?> outputStream;
+
                                                        if 
(boltOutputStreams.size() < 2) { // single output stream or sink
                                                                String 
outputStreamId = null;
                                                                if 
(boltOutputStreams.size() == 1) {
                                                                        
outputStreamId = (String) boltOutputStreams.keySet().toArray()[0];
                                                                }
-                                                               final 
TypeInformation<?> outType = declarer
+                                                               final 
TypeInformation<Tuple> outType = declarer
                                                                                
.getOutputType(outputStreamId);
 
-                                                               boltWrapper = 
new BoltWrapper(userBolt, this.outputStreams
-                                                                               
.get(producerId).get(inputStreamId));
-                                                               outputStream = 
inputStream.transform(boltId, outType, boltWrapper);
+                                                               final 
BoltWrapper<Tuple, Tuple> boltWrapperSingleOutput = new BoltWrapper<Tuple, 
Tuple>(
+                                                                               
userBolt, this.outputStreams.get(producerId).get(
+                                                                               
                inputStreamId));
+                                                               
boltWrapperSingleOutput.setStormTopology(stormTopology);
+
+                                                               final 
SingleOutputStreamOperator<Tuple, ?> outStream = inputStream
+                                                                               
.transform(boltId, outType, boltWrapperSingleOutput);
 
                                                                if (outType != 
null) {
                                                                        // only 
for non-sink nodes
-                                                                       final 
HashMap<String, DataStream> op = new HashMap<String, DataStream>();
-                                                                       
op.put(outputStreamId, outputStream);
+                                                                       final 
HashMap<String, DataStream<Tuple>> op = new HashMap<String, 
DataStream<Tuple>>();
+                                                                       
op.put(outputStreamId, outStream);
                                                                        
availableInputs.put(boltId, op);
                                                                }
+                                                               outputStream = 
outStream;
                                                        } else {
-                                                               final 
TypeInformation<?> outType = TypeExtractor
+                                                               final 
BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new 
BoltWrapper<Tuple, SplitStreamType<Tuple>>(
+                                                                               
userBolt, this.outputStreams.get(producerId).get(
+                                                                               
                inputStreamId));
+                                                               
boltWrapperMultipleOutputs.setStormTopology(stormTopology);
+
+                                                               
@SuppressWarnings({ "unchecked", "rawtypes" })
+                                                               final 
TypeInformation<SplitStreamType<Tuple>> outType = (TypeInformation) 
TypeExtractor
                                                                                
.getForClass(SplitStreamType.class);
 
-                                                               boltWrapper = 
new BoltWrapper(userBolt, 
this.outputStreams.get(producerId).get(inputStreamId));
-                                                               outputStream = 
inputStream.transform(boltId, outType, boltWrapper);
+                                                               final 
SingleOutputStreamOperator<SplitStreamType<Tuple>, ?> multiStream = inputStream
+                                                                               
.transform(boltId, outType, boltWrapperMultipleOutputs);
 
-                                                               final 
SplitStream splitStreams = outputStream
-                                                                               
.split(new StormStreamSelector());
+                                                               final 
SplitStream<SplitStreamType<Tuple>> splitStream = multiStream
+                                                                               
.split(new StormStreamSelector<Tuple>());
 
-                                                               final 
HashMap<String, DataStream> op = new HashMap<String, DataStream>();
+                                                               final 
HashMap<String, DataStream<Tuple>> op = new HashMap<String, 
DataStream<Tuple>>();
                                                                for (String 
outputStreamId : boltOutputStreams.keySet()) {
-                                                                       
op.put(outputStreamId, splitStreams.select(outputStreamId));
+                                                                       
op.put(outputStreamId,
+                                                                               
        splitStream.select(outputStreamId).map(
+                                                                               
                        new SplitStreamMapper<Tuple>()));
                                                                }
                                                                
availableInputs.put(boltId, op);
+                                                               outputStream = 
multiStream;
                                                        }
-                                                       
boltWrapper.setStormTopology(stormTopology);
 
                                                        int dop = 1;
                                                        if 
(common.is_set_parallelism_hint()) {
@@ -342,7 +359,7 @@ public class FlinkTopologyBuilder {
         *              the basic bolt
         * @param parallelism_hint
         *              the number of tasks that should be assigned to execute 
this bolt. Each task will run on a thread in a
-        *              process somwehere around the cluster.
+        *              process somewhere around the cluster.
         * @return use the returned object to declare the inputs to this 
component
         */
        public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, 
final Number parallelism_hint) {
@@ -371,7 +388,7 @@ public class FlinkTopologyBuilder {
         *              outputs.
         * @param parallelism_hint
         *              the number of tasks that should be assigned to execute 
this spout. Each task will run on a thread in a
-        *              process somwehere around the cluster.
+        *              process somewhere around the cluster.
         * @param spout
         *              the spout
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
deleted file mode 100644
index 71e5b86..0000000
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.storm.util;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
-
-/**
- * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for 
streams that are selected via
- * {@link StormStreamSelector} from a Spout or Bolt that declares multiple 
output streams.
- * 
- * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} 
tuples and applies a regular
- * {@link ArrayKeySelector} on it.
- */
-public class SplitStreamTypeKeySelector implements 
KeySelector<SplitStreamType<Tuple>, Tuple> {
-       private static final long serialVersionUID = 4672434660037669254L;
-
-       private final ArrayKeySelector<Tuple> selector;
-
-       public SplitStreamTypeKeySelector(TypeInformation<Tuple> type, int... 
fields) {
-               this.selector = KeySelectorUtil.getSelectorForArray(fields, 
type);
-       }
-
-       @Override
-       public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
-               return selector.getKey(value.value);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index f0913e8..12d967a 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.storm.util.SplitStreamType;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -61,7 +60,7 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
        private final Fields inputSchema;
        /** The original Storm topology. */
        protected StormTopology stormTopology;
-       
+
        /**
         *  We have to use this because Operators must output
         *  {@link 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
@@ -239,17 +238,11 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
                this.bolt.cleanup();
        }
 
-       @SuppressWarnings("unchecked")
        @Override
        public void processElement(final StreamRecord<IN> element) throws 
Exception {
                this.flinkCollector.setTimestamp(element.getTimestamp());
                IN value = element.getValue();
-               if (value instanceof SplitStreamType) {
-                       this.bolt.execute(new 
StormTuple<IN>(((SplitStreamType<IN>) value).value,
-                                       inputSchema));
-               } else {
-                       this.bolt.execute(new StormTuple<IN>(value, 
inputSchema));
-               }
+               this.bolt.execute(new StormTuple<IN>(value, inputSchema));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
index 49de476..8f0ad3b 100644
--- 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
@@ -20,7 +20,6 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer;
 import org.apache.flink.storm.util.AbstractTest;
 import org.junit.Assert;
@@ -30,8 +29,6 @@ import java.util.LinkedList;
 
 public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
 
-
-
        @Test
        public void testNull() {
                Assert.assertNull(new 
FlinkOutputFieldsDeclarer().getOutputType(null));
@@ -100,13 +97,8 @@ public class FlinkOutputFieldsDeclarerTest extends 
AbstractTest {
                for (String stream : streams) {
                        final TypeInformation<?> type = 
declarer.getOutputType(stream);
 
-                       if (numberOfAttributes == 1) {
-                               Assert.assertEquals(type.getClass(), 
GenericTypeInfo.class);
-                               Assert.assertEquals(type.getTypeClass(), 
Object.class);
-                       } else {
-                               Assert.assertEquals(numberOfAttributes, 
type.getArity());
-                               Assert.assertTrue(type.isTupleType());
-                       }
+                       Assert.assertEquals(numberOfAttributes, 
type.getArity());
+                       Assert.assertTrue(type.isTupleType());
                }
        }
 

Reply via email to