Repository: ignite Updated Branches: refs/heads/ignite-843-rc1 4cc2dc98a -> a362b8b7a
IGNITE-1710 update employee, car with random values Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a362b8b7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a362b8b7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a362b8b7 Branch: refs/heads/ignite-843-rc1 Commit: a362b8b7a10b18a399301a08589faf913ef2497b Parents: 4cc2dc9 Author: Andrey <[email protected]> Authored: Tue Oct 20 11:45:07 2015 +0700 Committer: Andrey <[email protected]> Committed: Tue Oct 20 11:45:07 2015 +0700 ---------------------------------------------------------------------- .../agent/testdrive/AgentSqlTestDrive.java | 136 +++++++++++++++++-- 1 file changed, 123 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a362b8b7/modules/control-center-agent/src/main/java/org/apache/ignite/agent/testdrive/AgentSqlTestDrive.java ---------------------------------------------------------------------- diff --git a/modules/control-center-agent/src/main/java/org/apache/ignite/agent/testdrive/AgentSqlTestDrive.java b/modules/control-center-agent/src/main/java/org/apache/ignite/agent/testdrive/AgentSqlTestDrive.java index 9e387fb..9b71178 100644 --- a/modules/control-center-agent/src/main/java/org/apache/ignite/agent/testdrive/AgentSqlTestDrive.java +++ b/modules/control-center-agent/src/main/java/org/apache/ignite/agent/testdrive/AgentSqlTestDrive.java @@ -23,7 +23,13 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.ignite.Ignite; @@ -87,6 +93,12 @@ public class AgentSqlTestDrive { /** Departments count */ private static final int PARK_CNT = 10; + /** Counter for threads in pool. */ + private static final AtomicInteger THREAD_CNT = new AtomicInteger(0); + + /** */ + private static ScheduledExecutorService cachePool; + /** * Configure cacheEmployee. * @@ -291,8 +303,9 @@ public class AgentSqlTestDrive { /** * @param ignite Ignite. * @param name Cache name. + * @param range Time range in milliseconds. */ - private static void populateCacheEmployee(Ignite ignite, String name) { + private static void populateCacheEmployee(Ignite ignite, String name, long range) { log.log(Level.FINE, "TEST-DRIVE-SQL: Start population cache: '" + name + "' with data..."); IgniteCache<CountryKey, Country> cacheCountry = ignite.cache(name); @@ -311,12 +324,6 @@ public class AgentSqlTestDrive { IgniteCache<EmployeeKey, Employee> cacheEmployee = ignite.cache(name); - long off = java.sql.Date.valueOf("2007-01-01").getTime(); - - long end = java.sql.Date.valueOf("2016-01-01").getTime(); - - long diff = end - off + 1; - for (int i = 0; i < EMPL_CNT; i++) { Integer mgrId = (i == 0 || rnd.nextBoolean()) ? null : rnd.nextInt(i); @@ -324,7 +331,7 @@ public class AgentSqlTestDrive { cacheEmployee.put(new EmployeeKey(i), new Employee(i, "first name " + (i + 1), "last name " + (i + 1), "email " + (i + 1), - "phone number " + (i + 1), new java.sql.Date(off + (long)(r * diff)), "job " + (i + 1), + "phone number " + (i + 1), new java.sql.Date((long)(r * range)), "job " + (i + 1), round(r * 5000, 2) , mgrId, rnd.nextInt(DEP_CNT))); } @@ -343,16 +350,121 @@ public class AgentSqlTestDrive { for (int i = 0; i < PARK_CNT; i++) cacheParking.put(new ParkingKey(i), new Parking(i, "Parking " + (i + 1))); - IgniteCache<CarKey, Car> cacheDepartment = ignite.cache(name); + IgniteCache<CarKey, Car> cacheCar = ignite.cache(name); for (int i = 0; i < CAR_CNT; i++) - cacheDepartment.put(new CarKey(i), new Car(i, rnd.nextInt(PARK_CNT), "Car " + (i + 1))); + cacheCar.put(new CarKey(i), new Car(i, rnd.nextInt(PARK_CNT), "Car " + (i + 1))); log.log(Level.FINE, "TEST-DRIVE-SQL: Finished population cache: '" + name + "' with data."); } /** + * Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically. + * + * @param corePoolSize Number of threads to keep in the pool, even if they are idle. + * @param threadName Part of thread name that would be used by thread factory. + * @return Newly created scheduled thread pool. + */ + private static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, final String threadName) { + ScheduledExecutorService srvc = Executors.newScheduledThreadPool(corePoolSize, new ThreadFactory() { + @Override public Thread newThread(Runnable r) { + Thread thread = new Thread(r, String.format("%s-%d", threadName, THREAD_CNT.getAndIncrement())); + + thread.setDaemon(true); + + return thread; + } + }); + + ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) srvc; + + // Setting up shutdown policy. + executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + + return srvc; + } + + /** + * Starts read and write from cache in background. + * + * @param ignite Ignite. + * @param n - maximum count read/write key + */ + private static void startLoad(final Ignite ignite, final int n) { + final long diff = new java.util.Date().getTime(); + + populateCacheEmployee(ignite, EMPLOYEE_CACHE_NAME, diff); + + populateCacheCar(ignite, CAR_CACHE_NAME); + + if (cachePool != null) + cachePool.shutdownNow(); + + cachePool = newScheduledThreadPool(2, "test-drive-sql-load-cache-tasks"); + + if (cachePool != null) { + cachePool.scheduleWithFixedDelay(new Runnable() { + @Override public void run() { + try { + IgniteCache<EmployeeKey, Employee> cache = ignite.cache(EMPLOYEE_CACHE_NAME); + + if (cache != null) + for (int i = 0; i < n; i++) { + Integer employeeId = rnd.nextInt(EMPL_CNT); + + Integer mgrId = (i == 0 || rnd.nextBoolean()) ? null : rnd.nextInt(employeeId); + + double r = rnd.nextDouble(); + + cache.put(new EmployeeKey(employeeId), + new Employee(employeeId, "first name " + (i + 1), "last name " + (i + 1), + "email " + (i + 1), "phone number " + (i + 1), + new java.sql.Date((long)(r * diff)), "job " + (i + 1), + round(r * 5000, 2) , mgrId, rnd.nextInt(DEP_CNT))); + + if (rnd.nextBoolean()) + cache.remove(new EmployeeKey(rnd.nextInt(EMPL_CNT))); + } + } + catch (IllegalStateException ignored) { + } + catch (Throwable e) { + if (!e.getMessage().contains("cache is stopped")) + ignite.log().error("Cache write task execution error", e); + } + } + }, 10, 3, TimeUnit.SECONDS); + + cachePool.scheduleWithFixedDelay(new Runnable() { + @Override public void run() { + try { + IgniteCache<CarKey, Car> cache = ignite.cache(CAR_CACHE_NAME); + + if (cache != null) + for (int i = 0; i < n; i++) { + Integer carId = rnd.nextInt(CAR_CNT); + + cache.put(new CarKey(carId), new Car(carId, rnd.nextInt(PARK_CNT), "Car " + (i + 1))); + + if (rnd.nextBoolean()) + cache.remove(new CarKey(rnd.nextInt(CAR_CNT))); + } + } + catch (IllegalStateException ignored) { + } + catch (Throwable e) { + if (!e.getMessage().contains("cache is stopped")) + ignite.log().error("Cache write task execution error", e); + } + } + }, 10, 3, TimeUnit.SECONDS); + } + } + + + /** * Start ignite node with cacheEmployee and populate it with data. */ public static void testDrive(AgentConfiguration acfg) { @@ -400,9 +512,7 @@ public class AgentSqlTestDrive { log.log(Level.INFO, "TEST-DRIVE-SQL: Embedded node for sql test-drive successfully started"); - populateCacheEmployee(ignite, EMPLOYEE_CACHE_NAME); - - populateCacheCar(ignite, CAR_CACHE_NAME); + startLoad(ignite, 20); } catch (Exception e) { log.log(Level.SEVERE, "TEST-DRIVE-SQL: Failed to start embedded node for sql test-drive!", e);
