Repository: incubator-streams
Updated Branches:
  refs/heads/master 571d4066e -> b71cce83d


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/provider/YoutubeProviderTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/provider/YoutubeProviderTest.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/provider/YoutubeProviderTest.java
new file mode 100644
index 0000000..9a7e547
--- /dev/null
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/provider/YoutubeProviderTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.youtube.YoutubeConfiguration;
+
+import com.google.api.services.youtube.YouTube;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for YoutubeProvider.
+ */
+public class YoutubeProviderTest {
+
+  /**
+   * Test that every collector will be run and that data queued from the 
collectors will be processed.
+   */
+  @Test
+  public void testDataCollectorRunsPerUser() {
+    Random random = new Random(System.currentTimeMillis());
+    int numUsers = random.nextInt(1000);
+    List<UserInfo> userList = new LinkedList<>();
+
+    for ( int i = 0; i < numUsers; ++i ) {
+      userList.add(new UserInfo());
+    }
+
+    YoutubeConfiguration config = new YoutubeConfiguration();
+    config.setYoutubeUsers(userList);
+    config.setApiKey("API_KEY");
+    YoutubeProvider provider = buildProvider(config);
+
+    try {
+      provider.prepare(null);
+      provider.startStream();
+      int datumCount = 0;
+      while (provider.isRunning()) {
+        datumCount += provider.readCurrent().size();
+      }
+      assertEquals(numUsers, datumCount);
+    } finally {
+      provider.cleanUp();
+    }
+  }
+
+  @Test
+  public void testConfigSetterGetter() {
+    YoutubeConfiguration config = new YoutubeConfiguration();
+    config.setApiKey("API_KEY");
+    config.setVersion("fake_version_1");
+    YoutubeConfiguration newConfig = new YoutubeConfiguration();
+    newConfig.setApiKey("API_KEY");
+    config.setVersion("fake_version_2");
+
+    YoutubeProvider provider = buildProvider(config);
+
+    assertEquals(provider.getConfig(), config);
+
+    provider.setConfig(newConfig);
+    assertEquals(provider.getConfig(), newConfig);
+  }
+
+  @Test
+  public void testUserInfoWithDefaultDates() {
+    YoutubeConfiguration config = new YoutubeConfiguration();
+    config.setApiKey("API_KEY");
+    YoutubeProvider provider = buildProvider(config);
+
+    DateTime afterDate = new DateTime(System.currentTimeMillis());
+    DateTime beforeDate = afterDate.minus(10000);
+
+    provider.setDefaultAfterDate(afterDate);
+    provider.setDefaultBeforeDate(beforeDate);
+
+    Set<String> users = new HashSet<>();
+    users.add("test_user_1");
+    users.add("test_user_2");
+    users.add("test_user_3");
+
+    provider.setUserInfoWithDefaultDates(users);
+
+    List<UserInfo> youtubeUsers = provider.getConfig().getYoutubeUsers();
+
+    for (UserInfo user : youtubeUsers) {
+      assert (user.getAfterDate().equals(afterDate));
+      assert (user.getBeforeDate().equals(beforeDate));
+    }
+  }
+
+  @Test
+  public void testUserInfoWithAfterDate() {
+    YoutubeConfiguration config = new YoutubeConfiguration();
+    config.setApiKey("API_KEY");
+    YoutubeProvider provider = buildProvider(config);
+
+    Map<String, DateTime> users = new HashMap<>();
+    users.put("user1", new DateTime(System.currentTimeMillis()));
+    users.put("user3", new DateTime(System.currentTimeMillis()));
+    users.put("user4", new DateTime(System.currentTimeMillis()));
+
+    provider.setUserInfoWithAfterDate(users);
+
+    List<UserInfo> youtubeUsers = provider.getConfig().getYoutubeUsers();
+
+    for (UserInfo user : youtubeUsers) {
+      assert (user.getAfterDate().equals(users.get(user.getUserId())));
+    }
+  }
+
+  private YoutubeProvider buildProvider(YoutubeConfiguration config) {
+    return new YoutubeProvider(config) {
+
+      @Override
+      protected YouTube createYouTubeClient() throws IOException {
+        return mock(YouTube.class);
+      }
+
+      @Override
+      protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
+        final BlockingQueue<StreamsDatum> q = queue;
+        return () -> {
+          try {
+            q.put(new StreamsDatum(null));
+          } catch (InterruptedException ie) {
+            fail("Test was interrupted");
+          }
+        };
+      }
+    };
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/provider/YoutubeUserActivityCollectorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/provider/YoutubeUserActivityCollectorTest.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/provider/YoutubeUserActivityCollectorTest.java
new file mode 100644
index 0000000..1d92a3e
--- /dev/null
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/provider/YoutubeUserActivityCollectorTest.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.local.queues.ThroughputQueue;
+import 
org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+import org.apache.streams.youtube.YoutubeConfiguration;
+
+import com.google.api.services.youtube.YouTube;
+import com.google.api.services.youtube.model.Activity;
+import com.google.api.services.youtube.model.ActivityContentDetails;
+import com.google.api.services.youtube.model.ActivityContentDetailsUpload;
+import com.google.api.services.youtube.model.ActivityListResponse;
+import com.google.api.services.youtube.model.Video;
+import com.google.api.services.youtube.model.VideoListResponse;
+import com.google.api.services.youtube.model.VideoSnippet;
+import org.joda.time.DateTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for YoutubeUserActivityCollector.
+ */
+public class YoutubeUserActivityCollectorTest {
+  private static final String USER_ID = "fake_user_id";
+  private static final String IN_RANGE_IDENTIFIER = "data in range";
+  private YoutubeConfiguration config;
+
+  private static VideoListResponse createMockVideoListResponse(int numBefore, 
int numAfter, int numInRange, DateTime after, DateTime before, boolean page) {
+    VideoListResponse feed = new VideoListResponse();
+    List<Video> list = new LinkedList<>();
+
+    for (int i = 0; i < numAfter; ++i) {
+      com.google.api.client.util.DateTime published = new 
com.google.api.client.util.DateTime(after.getMillis() + 1000000);
+      Video video = new Video();
+      video.setSnippet(new VideoSnippet());
+      video.getSnippet().setPublishedAt(published);
+      list.add(video);
+    }
+    for (int i = 0; i < numInRange; ++i) {
+      DateTime published;
+      if ((before == null && after == null) || before == null) {
+        published = DateTime.now(); // no date range or end time date range so 
just make the time now.
+      } else if (after == null) {
+        published = before.minusMillis(100000); //no beginning to range
+      } else { // has to be in range
+        long range = before.getMillis() - after.getMillis();
+        published = after.plus(range / 2); //in the middle
+      }
+      com.google.api.client.util.DateTime ytPublished = new 
com.google.api.client.util.DateTime(published.getMillis());
+      Video video = new Video();
+      video.setSnippet(new VideoSnippet());
+      video.getSnippet().setPublishedAt(ytPublished);
+      video.getSnippet().setTitle(IN_RANGE_IDENTIFIER);
+      list.add(video);
+    }
+    for (int i = 0; i < numBefore; ++i) {
+      com.google.api.client.util.DateTime published = new 
com.google.api.client.util.DateTime((after.minusMillis(100000)).getMillis());
+      Video video = new Video();
+      video.setSnippet(new VideoSnippet());
+      video.getSnippet().setPublishedAt(published);
+      list.add(video);
+    }
+    if (page) {
+      feed.setNextPageToken("A");
+    } else {
+      feed.setNextPageToken(null);
+    }
+
+    feed.setItems(list);
+
+    return feed;
+  }
+
+  @Before
+  public void setup() {
+    this.config = new YoutubeConfiguration();
+    this.config.setApiKey("API_KEY");
+  }
+
+  @Test
+  public void testGetVideos() throws IOException {
+    DateTime now = new DateTime(System.currentTimeMillis());
+    YouTube youtube = buildYouTube(0, 1, 0, now, now.minus(10000));
+
+    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
+    YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
+
+    List<Video> video = collector.getVideoList("fake_video_id");
+
+    assertNotNull(video.get(0));
+  }
+
+  @Test
+  public void testGetVideosNull() throws IOException {
+    DateTime now = new DateTime(System.currentTimeMillis());
+    YouTube youtube = buildYouTube(0, 0, 0, now.plus(10000), now.minus(10000));
+
+    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
+    YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
+
+    List<Video> video = collector.getVideoList("fake_video_id");
+
+    assertEquals(video.size(), 0);
+  }
+
+  @Test
+  public void testProcessActivityFeed() throws IOException, 
InterruptedException {
+    DateTime now = new DateTime(System.currentTimeMillis());
+    YouTube youtube = buildYouTube(0, 0, 5, now.plus(3000000), 
now.minus(1000000));
+
+    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
+    YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
+
+    ActivityListResponse feed = buildActivityListResponse(1);
+
+    collector.processActivityFeed(feed, new 
DateTime(System.currentTimeMillis()), null);
+
+    assertEquals(collector.getDatumQueue().size(), 5);
+  }
+
+  @Test
+  public void testProcessActivityFeedBefore() throws IOException, 
InterruptedException {
+    DateTime now = new DateTime(System.currentTimeMillis());
+    YouTube youtube = buildYouTube(5, 0, 0, now, now);
+
+    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
+    YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
+
+    ActivityListResponse feed = buildActivityListResponse(1);
+
+    collector.processActivityFeed(feed, new 
DateTime(System.currentTimeMillis()), null);
+
+    assertEquals(collector.getDatumQueue().size(), 0);
+  }
+
+  @Test
+  public void testProcessActivityFeedAfter() throws IOException, 
InterruptedException {
+    DateTime now = new DateTime(System.currentTimeMillis());
+    YouTube youtube = buildYouTube(0, 5, 0, now, now);
+
+    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
+    YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
+
+    ActivityListResponse feed = buildActivityListResponse(1);
+
+    collector.processActivityFeed(feed, new DateTime(now.getMillis() - 
100000), null);
+
+    assertEquals(collector.getDatumQueue().size(), 5);
+  }
+
+  @Test
+  public void testProcessActivityFeedMismatchCount() throws IOException, 
InterruptedException {
+    DateTime now = new DateTime(System.currentTimeMillis());
+    YouTube youtube = buildYouTube(5, 5, 5, now, now.minus(100000));
+
+    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
+    YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
+
+    ActivityListResponse feed = buildActivityListResponse(1);
+
+    collector.processActivityFeed(feed, new DateTime(now), null);
+
+    assertEquals(collector.getDatumQueue().size(), 5);
+  }
+
+  @Test
+  public void testProcessActivityFeedMismatchCountInRange() throws 
IOException, InterruptedException {
+    DateTime now = new DateTime(System.currentTimeMillis());
+    YouTube youtube = buildYouTube(5, 5, 5, now, now.minus(100000));
+
+    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
+    YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
+
+    ActivityListResponse feed = buildActivityListResponse(1);
+
+    collector.processActivityFeed(feed, new DateTime(now), new 
DateTime(now).minus(100000));
+
+    assertEquals(collector.getDatumQueue().size(), 5);
+  }
+
+  private ActivityListResponse buildActivityListResponse(int num) {
+    ActivityListResponse activityListResponse = new ActivityListResponse();
+    List<Activity> items = new ArrayList<>();
+
+    for ( int x = 0; x < num; x++ ) {
+      Activity activity = new Activity();
+
+      ActivityContentDetails contentDetails = new ActivityContentDetails();
+      ActivityContentDetailsUpload upload = new ActivityContentDetailsUpload();
+      upload.setVideoId("video_id_" + x);
+      contentDetails.setUpload(upload);
+
+      activity.setId("id_" + x);
+      activity.setContentDetails(contentDetails);
+
+      items.add(activity);
+    }
+
+    activityListResponse.setItems(items);
+
+    return activityListResponse;
+  }
+
+  private YouTube buildYouTube(int numBeforeRange, int numAfterRange, int 
numInRange, DateTime afterDate, DateTime beforeDate) {
+
+    return createYoutubeMock(numBeforeRange, numAfterRange, numInRange, 
afterDate, beforeDate);
+
+  }
+
+  private YouTube createYoutubeMock(int numBefore, int numAfter, int 
numInRange, DateTime after, DateTime before) {
+    YouTube youtube = mock(YouTube.class);
+
+    final YouTube.Videos videos = createMockVideos(numBefore, numAfter, 
numInRange, after, before);
+    doAnswer(invocationOnMock -> videos).when(youtube).videos();
+
+    return youtube;
+  }
+
+  private YouTube.Videos createMockVideos(int numBefore, int numAfter, int 
numInRange, DateTime after, DateTime before) {
+    YouTube.Videos videos = mock(YouTube.Videos.class);
+
+    try {
+      YouTube.Videos.List list = createMockVideosList(numBefore, numAfter, 
numInRange, after, before);
+      when(videos.list(anyString())).thenReturn(list);
+    } catch (IOException ex) {
+      fail("Exception thrown while creating mock");
+    }
+
+    return videos;
+  }
+
+  private YouTube.Videos.List createMockVideosList(int numBefore, int 
numAfter, int numInRange, DateTime after, DateTime before) {
+    YouTube.Videos.List list = mock(YouTube.Videos.List.class);
+
+    when(list.setMaxResults(anyLong())).thenReturn(list);
+    when(list.setPageToken(anyString())).thenReturn(list);
+    when(list.setId(anyString())).thenReturn(list);
+    when(list.setKey(anyString())).thenReturn(list);
+
+    VideoListResponseAnswer answer = new VideoListResponseAnswer(numBefore, 
numAfter, numInRange, after, before);
+    try {
+      doAnswer(answer).when(list).execute();
+    } catch (IOException ioe) {
+      fail("Should not have thrown exception while creating mock. : " + 
ioe.getMessage());
+    }
+
+    return list;
+  }
+
+  private static class VideoListResponseAnswer implements 
Answer<VideoListResponse> {
+    private int afterCount = 0;
+    private int beforeCount = 0;
+    private int inCount = 0;
+    private int maxBatch = 100;
+
+    private int numAfter;
+    private int numInRange;
+    private int numBefore;
+    private DateTime after;
+    private DateTime before;
+
+    private VideoListResponseAnswer(int numBefore, int numAfter, int 
numInRange, DateTime after, DateTime before) {
+      this.numBefore = numBefore;
+      this.numAfter = numAfter;
+      this.numInRange = numInRange;
+      this.after = after;
+      this.before = before;
+    }
+
+    @Override
+    public VideoListResponse answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+      int totalCount = 0;
+      int batchAfter = 0;
+      int batchBefore = 0;
+      int batchIn = 0;
+      inCount = 0;
+      afterCount = 0;
+      beforeCount = 0;
+
+      if (afterCount != numAfter) {
+        if (numAfter - afterCount >= maxBatch) {
+          afterCount += maxBatch;
+          batchAfter += maxBatch;
+          totalCount += batchAfter;
+        } else {
+          batchAfter += numAfter - afterCount;
+          totalCount += numAfter - afterCount;
+          afterCount = numAfter;
+        }
+      }
+      if (totalCount < maxBatch && inCount != numInRange) {
+        if (numInRange - inCount >= maxBatch - totalCount) {
+          inCount += maxBatch - totalCount;
+          batchIn += maxBatch - totalCount;
+          totalCount += batchIn;
+        } else {
+          batchIn += numInRange - inCount;
+          totalCount += numInRange - inCount;
+          inCount = numInRange;
+        }
+      }
+      if (totalCount < maxBatch && beforeCount != numBefore) {
+        if (numBefore - batchBefore >= maxBatch - totalCount) {
+          batchBefore += maxBatch - totalCount;
+          totalCount = maxBatch;
+          beforeCount += batchBefore;
+        } else {
+          batchBefore += numBefore - beforeCount;
+          totalCount += numBefore - beforeCount;
+          beforeCount = numBefore;
+        }
+      }
+
+      return createMockVideoListResponse(batchBefore, batchAfter, batchIn, 
after, before, numAfter != afterCount || inCount != numInRange || beforeCount 
!= numBefore);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/serializer/YoutubeEventClassifierTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/serializer/YoutubeEventClassifierTest.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/serializer/YoutubeEventClassifierTest.java
new file mode 100644
index 0000000..d473abe
--- /dev/null
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/serializer/YoutubeEventClassifierTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.serializer;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.api.services.youtube.model.Video;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class YoutubeEventClassifierTest {
+
+  private final String testVideo = 
"{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion
 Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a 
LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart 
failure, treatment options, and lifestyle changes. Learn more: 
https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be
 Heart Smart: Congestive Heart Failure LIVE 
Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
+  private final String testObjectNode = 
"{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#somethingElse\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion
 Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a 
LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart 
failure, treatment options, and lifestyle changes. Learn more: 
https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be
 Heart Smart: Congestive Heart Failure LIVE 
Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
+
+  @Test
+  public void testVideoClassification() {
+    Class klass = YoutubeEventClassifier.detectClass(testVideo);
+
+    assertEquals(klass, Video.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testExceptionClassification() {
+    YoutubeEventClassifier.detectClass("");
+  }
+
+  @Test
+  public void testObjectNodeClassification() {
+    Class klass = YoutubeEventClassifier.detectClass(testObjectNode);
+
+    assertEquals(klass, ObjectNode.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/serializer/YoutubeVideoSerDeTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/serializer/YoutubeVideoSerDeTest.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/serializer/YoutubeVideoSerDeTest.java
new file mode 100644
index 0000000..e4affad
--- /dev/null
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/serializer/YoutubeVideoSerDeTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.serializer;
+
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.extensions.ExtensionUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Provider;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.services.youtube.model.Video;
+import org.joda.time.DateTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for YoutubeVideoSerDe.
+ */
+public class YoutubeVideoSerDeTest {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeVideoSerDeTest.class);
+  private final String testVideo = 
"{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion
 Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a 
LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart 
failure, treatment options, and lifestyle changes. Learn more: 
https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be
 Heart Smart: Congestive Heart Failure LIVE 
Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
+  private ObjectMapper objectMapper;
+
+  /**
+   * setup for test.
+   */
+  @Before
+  public void setup() {
+    objectMapper = StreamsJacksonMapper.getInstance();
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Video.class, new YoutubeVideoDeserializer());
+    objectMapper.registerModule(simpleModule);
+    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+  }
+
+  @Test
+  public void testVideoObject() {
+    LOGGER.info("raw: {}", testVideo);
+
+    try {
+      Activity activity = new Activity();
+
+      Video video = objectMapper.readValue(testVideo, Video.class);
+
+      YoutubeActivityUtil.updateActivity(video, activity, "testChannelId");
+      LOGGER.info("activity: {}", activity);
+
+      assertNotNull(activity);
+      assert (activity.getId().contains("id:youtube:post"));
+      assertEquals(activity.getVerb(), "post");
+
+      Provider provider = activity.getProvider();
+      assertEquals(provider.getId(), "id:providers:youtube");
+      assertEquals(provider.getDisplayName(), "YouTube");
+
+      ActivityObject actor = activity.getActor();
+      assert (actor.getId().contains("id:youtube:"));
+      assertNotNull(actor.getDisplayName());
+      assertNotNull(actor.getSummary());
+
+      assertNotNull(activity.getTitle());
+      assertNotNull(activity.getUrl());
+      assertNotNull(activity.getContent());
+
+      assertEquals(activity.getPublished().getClass(), DateTime.class);
+
+      Map<String, Object> extensions = 
ExtensionUtil.getInstance().ensureExtensions(activity);
+
+      assertNotNull(extensions.get("youtube"));
+      assertNotNull(extensions.get("likes"));
+
+      assertTrue(testActivityObject(activity));
+    } catch (Exception ex) {
+      LOGGER.error("Exception while testing the Ser/De functionality of the 
Video deserializer: {}", ex);
+    }
+  }
+
+  private boolean testActivityObject(Activity activity) {
+    boolean valid = false;
+
+    ActivityObject obj = activity.getObject();
+
+    if ( obj.getObjectType().equals("video")
+        && !(obj.getImage() == null)
+        && !obj.getUrl().equals("null")
+        && obj.getUrl().contains("https://www.youtube.com/watch?v=";)) {
+      valid = true;
+    }
+
+    return valid;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
index f551d93..fac0b2e 100644
--- 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
@@ -18,7 +18,8 @@
 
 package org.apache.streams.youtube.test.providers;
 
-import com.youtube.provider.YoutubeChannelProvider;
+import org.apache.streams.youtube.provider.YoutubeChannelProvider;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
index 34442a2..df1eaef 100644
--- 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
@@ -18,7 +18,7 @@
 
 package org.apache.streams.youtube.test.providers;
 
-import com.youtube.provider.YoutubeUserActivityProvider;
+import org.apache.streams.youtube.provider.YoutubeUserActivityProvider;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-plugins/streams-plugin-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-cassandra/pom.xml 
b/streams-plugins/streams-plugin-cassandra/pom.xml
index 9316bc3..8009bfd 100644
--- a/streams-plugins/streams-plugin-cassandra/pom.xml
+++ b/streams-plugins/streams-plugin-cassandra/pom.xml
@@ -157,11 +157,6 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-plugins/streams-plugin-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-elasticsearch/pom.xml 
b/streams-plugins/streams-plugin-elasticsearch/pom.xml
index 30d7973..7487745 100644
--- a/streams-plugins/streams-plugin-elasticsearch/pom.xml
+++ b/streams-plugins/streams-plugin-elasticsearch/pom.xml
@@ -158,11 +158,6 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-plugins/streams-plugin-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hbase/pom.xml 
b/streams-plugins/streams-plugin-hbase/pom.xml
index 89c5f6c..d521c88 100644
--- a/streams-plugins/streams-plugin-hbase/pom.xml
+++ b/streams-plugins/streams-plugin-hbase/pom.xml
@@ -157,11 +157,6 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-plugins/streams-plugin-hive/pom.xml
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/pom.xml 
b/streams-plugins/streams-plugin-hive/pom.xml
index 133f30f..7b9da30 100644
--- a/streams-plugins/streams-plugin-hive/pom.xml
+++ b/streams-plugins/streams-plugin-hive/pom.xml
@@ -157,11 +157,6 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-plugins/streams-plugin-pig/pom.xml
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-pig/pom.xml 
b/streams-plugins/streams-plugin-pig/pom.xml
index 57317e9..f85d00e 100644
--- a/streams-plugins/streams-plugin-pig/pom.xml
+++ b/streams-plugins/streams-plugin-pig/pom.xml
@@ -157,11 +157,6 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-plugins/streams-plugin-scala/pom.xml
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-scala/pom.xml 
b/streams-plugins/streams-plugin-scala/pom.xml
index b8ef652..70bd622 100644
--- a/streams-plugins/streams-plugin-scala/pom.xml
+++ b/streams-plugins/streams-plugin-scala/pom.xml
@@ -20,7 +20,6 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
 
     <modelVersion>4.0.0</modelVersion>
-    <groupId>org.apache.streams.plugins</groupId>
     <artifactId>streams-plugin-scala</artifactId>
     <version>0.5-incubating-SNAPSHOT</version>
     <packaging>maven-plugin</packaging>
@@ -154,11 +153,6 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-testing/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/streams-testing/src/test/resources/log4j.properties 
b/streams-testing/src/test/resources/log4j.properties
index c6c20ae..c183d78 100644
--- a/streams-testing/src/test/resources/log4j.properties
+++ b/streams-testing/src/test/resources/log4j.properties
@@ -18,7 +18,7 @@ log4j.logger.ch.qos.logback=ERROR
 log4j.logger.com.foursquare=WARN
 log4j.logger.com.google=WARN
 log4j.logger.com.mongodb=WARN
-log4j.logger.com.youtube=WARN
+log4j.logger.org.apache.streams.youtube=WARN
 log4j.logger.io.dropwizard=WARN
 log4j.logger.org.apache.streams=WARN
 log4j.logger.org.apache.spark=WARN

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-testing/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/streams-testing/src/test/resources/logback-test.xml 
b/streams-testing/src/test/resources/logback-test.xml
index 2dfa4b2..9dd8bf3 100644
--- a/streams-testing/src/test/resources/logback-test.xml
+++ b/streams-testing/src/test/resources/logback-test.xml
@@ -21,7 +21,7 @@
     <logger name="com.mongodb" level="WARN"/>
     <logger name="com.foursquare" level="WARN"/>
     <logger name="com.google" level="WARN"/>
-    <logger name="com.youtube" level="WARN"/>
+    <logger name="org.apache.streams.youtube" level="WARN"/>
     <logger name="io.dropwizard" level="WARN"/>
     <logger name="org.apache.streams" level="WARN"/>
     <logger name="org.apache.spark" level="WARN"/>


Reply via email to